reddb_server/runtime/statement_frame.rs
1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7 collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8 has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9 query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10 SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30 /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31 /// any role from `Role::Read` upward.
32 Read,
33 /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34 /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35 /// at least `Role::Write`.
36 Write,
37 /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38 /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39 Admin,
40 /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41 /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42 /// for any authenticated identity.
43 None,
44}
45
46impl Privilege {
47 /// `true` iff `role` is sufficient to execute a statement carrying
48 /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49 /// Admin` containment used by the auth fallback path.
50 pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51 match self {
52 Self::None => true,
53 Self::Read => role.can_read(),
54 Self::Write => role.can_write(),
55 Self::Admin => role.can_admin(),
56 }
57 }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67 /// No collection-level lock needed (transaction control, SET,
68 /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69 None,
70 /// Reader-style intent: SELECT, joins, graph / queue / search
71 /// reads. Maps to `(IS, IS)` at the storage layer.
72 Shared,
73 /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74 /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75 /// `Exclusive` at this granularity — call sites that need the
76 /// finer distinction still consult `intent_lock_modes_for`.
77 Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93 /// Effective tenant scope for the statement after WITHIN /
94 /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95 /// "no tenant bound" (RLS deny-default applies).
96 fn effective_scope(&self) -> Option<&str>;
97
98 /// Authenticated identity observed at frame-build time, if any.
99 /// Returns `(username, role)` so callers can render audit lines
100 /// or feed RLS policy lookups without re-reading thread-locals.
101 fn identity(&self) -> Option<(&str, Role)>;
102
103 /// MVCC snapshot the statement reads against. For autocommit
104 /// this is a fresh snapshot; inside an active transaction it
105 /// is the txn's snapshot; under AS OF it is the resolved
106 /// historical xid.
107 fn snapshot(&self) -> &Snapshot;
108
109 /// AS OF xid floor when AS OF was applied for this statement,
110 /// `None` for live reads. Useful for downstream callers that
111 /// want to gate behaviour on historical-read mode without
112 /// re-parsing the query.
113 fn as_of_floor(&self) -> Option<Xid>;
114
115 /// Stable result-cache key for the statement (already mixes
116 /// effective tenant + identity).
117 fn cache_key(&self) -> &str;
118
119 /// Whether the statement is safe to serve from / populate the
120 /// result cache. Combines two underlying signals:
121 ///
122 /// * the query does not call a volatile builtin (e.g. `NOW()`,
123 /// `RANDOM()`, `UUID()`), which would change between calls,
124 /// * the connection is not inside an active transaction with
125 /// uncommitted writes that other readers shouldn't observe.
126 ///
127 /// SELECT cache callsites (read + write) consult this method
128 /// instead of re-deriving safety from globals or poking the
129 /// frame's private fields. Removing it would force every cache
130 /// callsite to re-run `query_has_volatile_builtin` plus
131 /// `result_cache_safe(conn_id)` inline.
132 fn should_cache_result(&self) -> bool;
133
134 /// Coarse privilege class the statement requires, computed once
135 /// at frame-build time from the SQL prefix. Read/write dispatch
136 /// sites consult this instead of re-classifying the parsed
137 /// `QueryExpr` inline at every callsite.
138 ///
139 /// Removing this method would force every privilege gate to
140 /// recompute the (action, resource) classification from the
141 /// parsed expression and re-check the role hierarchy inline.
142 fn required_privilege(&self) -> Privilege;
143
144 /// Coarse collection-level lock intent the statement implies.
145 /// `None` lets the lock-acquisition path short-circuit without
146 /// touching the lock manager.
147 ///
148 /// Removing this method would force the lock-acquisition path
149 /// to always invoke `intent_lock_modes_for` (which itself walks
150 /// the parsed expression) even for transaction-control / SET /
151 /// SHOW statements that need no collection lock at all.
152 fn lock_intent(&self) -> LockIntent;
153
154 /// Set of collection ids the calling identity is allowed to
155 /// observe under the active `(tenant, role)` scope. Computed once
156 /// at frame-build time via the `AuthStore` visible-collections
157 /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158 /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159 /// before any similarity score is computed (issue #119).
160 ///
161 /// `None` means the frame was built without an auth store wired —
162 /// embedded / single-tenant tests run that way. AI search call
163 /// sites refuse to proceed with `None`, which is the deny-default
164 /// the issue requires; pure SELECT paths fall back to the existing
165 /// per-row RLS gate.
166 fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175 let trimmed = query.trim_start();
176 // Skip a leading line / block comment so the classifier doesn't
177 // misread `/* ... */ SELECT ...` as an unknown statement.
178 let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179 rest.split_once('\n')
180 .map(|(_, r)| r)
181 .unwrap_or("")
182 .trim_start()
183 } else {
184 trimmed
185 };
186 let mut tokens = trimmed.split(|c: char| c.is_whitespace() || c == '(' || c == ';');
187 let first = tokens.next().unwrap_or("");
188 let second = tokens.next().unwrap_or("");
189 if first.eq_ignore_ascii_case("KV") {
190 if second.eq_ignore_ascii_case("GET")
191 || second.eq_ignore_ascii_case("LIST")
192 || second.eq_ignore_ascii_case("WATCH")
193 {
194 return "read";
195 }
196 return "write";
197 }
198 if first.eq_ignore_ascii_case("VAULT") {
199 if second.eq_ignore_ascii_case("LIST")
200 || second.eq_ignore_ascii_case("WATCH")
201 || second.eq_ignore_ascii_case("HISTORY")
202 {
203 return "read";
204 }
205 return "write";
206 }
207 // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
208 let mut buf = [0u8; 16];
209 let bytes = first.as_bytes();
210 let n = bytes.len().min(buf.len());
211 for i in 0..n {
212 buf[i] = bytes[i].to_ascii_uppercase();
213 }
214 match &buf[..n] {
215 b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" | b"RANK"
216 | b"APPROX" | b"APPROXIMATE" | b"ZRANK" | b"ZRANGE" | b"LIST" | b"WATCH" | b"GET"
217 | b"HISTORY" => "read",
218 b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
219 b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
220 b"GRANT" | b"REVOKE" => "admin",
221 b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
222 | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
223 b"PUT" | b"INCR" | b"DECR" | b"ADD" | b"ROTATE" | b"PURGE" | b"UNSEAL" | b"INVALIDATE" => {
224 "write"
225 }
226 _ => "unknown",
227 }
228}
229
230fn classify_privilege(query: &str) -> Privilege {
231 match statement_kind(query) {
232 "read" => Privilege::Read,
233 "write" => Privilege::Write,
234 // DDL is gated at `Role::Write` in the legacy fallback (see
235 // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
236 // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
237 // GRANT / REVOKE upgrade to Admin via finer checks at the call
238 // site — the frame surfaces only the coarse class.
239 "ddl" => Privilege::Write,
240 "admin" => Privilege::Admin,
241 _ => Privilege::None,
242 }
243}
244
245fn classify_lock_intent(query: &str) -> LockIntent {
246 match statement_kind(query) {
247 "read" => LockIntent::Shared,
248 "write" | "ddl" => LockIntent::Exclusive,
249 _ => LockIntent::None,
250 }
251}
252
253pub(super) struct StatementExecutionFrame {
254 tx_local_tenant: Option<Option<String>>,
255 snapshot: Snapshot,
256 own_xids: HashSet<Xid>,
257 cache_key: String,
258 is_volatile_query: bool,
259 cache_safe: bool,
260 /// Effective tenant captured at frame-build time after WITHIN /
261 /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
262 /// so the `ReadFrame` Interface can return a borrow without
263 /// re-touching the thread-local stack.
264 effective_scope: Option<String>,
265 /// Auth identity captured at frame-build time. `None` for
266 /// embedded / anonymous callers.
267 identity: Option<(String, Role)>,
268 /// `Some(xid)` when AS OF resolved to a historical xid; `None`
269 /// for live reads.
270 as_of_floor: Option<Xid>,
271 /// True when the statement snapshot can require tuple versions that
272 /// current secondary indexes no longer contain.
273 requires_index_fallback: bool,
274 /// Privilege class required by the statement, derived from the
275 /// SQL text at frame-build time. Read/write dispatch sites
276 /// consult this instead of re-classifying the parsed expression.
277 required_privilege: Privilege,
278 /// Collection-level lock intent the statement implies. The
279 /// lock-acquisition path short-circuits when this is `None`.
280 lock_intent: LockIntent,
281 /// Set of collection ids the active `(tenant, role)` scope is
282 /// allowed to observe. Computed at frame-build time via the
283 /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
284 /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
285 /// scoring (issue #119). `None` when no auth store is wired
286 /// (embedded test mode) — AI search refuses on `None`.
287 visible_collections: Option<HashSet<String>>,
288 /// Per-owner buffer arena for query-result row chunks (#885). Owned
289 /// by the frame because the frame already owns the query lifecycle;
290 /// lent to the row-streaming path (`execute_runtime_table_query_in`)
291 /// so chunk buffers are reused across the statement's chunk-fetches
292 /// instead of allocated fresh per chunk. Reclaimed when the frame
293 /// drops at statement end — no `thread_local!` scratch, which would
294 /// be unsound under tokio's work-stealing runtime.
295 row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
296}
297
298pub(super) struct StatementFrameGuards {
299 _tx_local_guard: TxLocalTenantGuard,
300 _config_snapshot_guard: ConfigSnapshotGuard,
301 _secret_store_guard: SecretStoreGuard,
302 _snapshot_guard: CurrentSnapshotGuard,
303}
304
305pub(super) struct PreparedStatement {
306 pub(super) expr: QueryExpr,
307 pub(super) mode: QueryMode,
308}
309
310impl StatementExecutionFrame {
311 pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
312 let conn_id = current_connection_id();
313 let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
314 let own_xids = runtime.own_transaction_xids(conn_id);
315 let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
316 let requires_index_fallback =
317 as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
318 let cache_key = result_cache_key(query);
319 let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
320 let cache_safe = runtime.result_cache_safe(conn_id);
321 // Capture identity + effective scope under the same
322 // thread-local view that the cache key was built from, so
323 // the Interface and the cache key agree on what "this
324 // statement" means.
325 let effective_scope = current_tenant();
326 let identity = current_auth_identity();
327
328 // Coarse classification of the statement, computed once from
329 // the SQL prefix so downstream callers don't re-derive it
330 // from the parsed `QueryExpr` at every privilege / lock site.
331 let required_privilege = classify_privilege(query);
332 let lock_intent = classify_lock_intent(query);
333
334 // Issue #119: resolve the visible-collections set for the
335 // active (tenant, role) scope. Only meaningful when an auth
336 // store is wired *and* an identity was captured — embedded
337 // anonymous callers fall back to `None`, and AI search call
338 // sites refuse on `None`.
339 let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
340 {
341 (Some(store), Some((principal, role))) => {
342 let collections = runtime.inner.db.store().list_collections();
343 Some(store.visible_collections_for_scope(
344 effective_scope.as_deref(),
345 *role,
346 principal,
347 &collections,
348 ))
349 }
350 _ => None,
351 };
352
353 Ok(Self {
354 tx_local_tenant,
355 snapshot,
356 own_xids,
357 cache_key,
358 is_volatile_query,
359 cache_safe,
360 effective_scope,
361 identity,
362 as_of_floor,
363 requires_index_fallback,
364 required_privilege,
365 lock_intent,
366 visible_collections,
367 row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
368 })
369 }
370
371 /// Lend the frame's per-owner row-buffer arena (#885) to the
372 /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
373 /// the owner and the arena is reclaimed when the frame drops at
374 /// statement end.
375 pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
376 Rc::clone(&self.row_arena)
377 }
378
379 pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
380 StatementFrameGuards {
381 _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
382 _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
383 _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
384 _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
385 snapshot: self.snapshot.clone(),
386 manager: Arc::clone(&runtime.inner.snapshot_manager),
387 own_xids: self.own_xids.clone(),
388 requires_index_fallback: self.requires_index_fallback,
389 }),
390 }
391 }
392
393 pub(super) fn cache_key(&self) -> &str {
394 &self.cache_key
395 }
396
397 pub(super) fn can_read_result_cache(&self) -> bool {
398 // Delegates to the `ReadFrame` Interface so the volatile +
399 // active-tx safety decision lives in exactly one place.
400 <Self as ReadFrame>::should_cache_result(self)
401 }
402
403 pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
404 // Cache-safety (volatile builtin, active-tx writes) comes from
405 // the Interface; the rest are write-side payload heuristics
406 // (statement shape, result size) that aren't part of the
407 // safety contract.
408 <Self as ReadFrame>::should_cache_result(self)
409 && result.statement_type == "select"
410 && result.engine != "vault"
411 && result.engine != "runtime-rank"
412 // `QUEUE READ` is a stateful read: a delayed message
413 // (issue #722) becomes deliverable over time without a
414 // producer push to invalidate the cache, so a cached empty
415 // result would hide it. Skip caching entirely.
416 && result.statement != "queue_group_read"
417 && result.result.pre_serialized_json.is_none()
418 // Graph-analytics TVF output (issue #802) is deterministic and
419 // expensive to recompute, so it is cached at any row count. The
420 // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
421 && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
422 }
423
424 pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
425 if self.can_read_result_cache() {
426 runtime.get_result_cache_entry(self.cache_key())
427 } else {
428 None
429 }
430 }
431
432 pub(super) fn write_result_cache(
433 &self,
434 runtime: &RedDBRuntime,
435 result: &RuntimeQueryResult,
436 scopes: HashSet<String>,
437 ) {
438 if self.should_write_result_cache(result) {
439 runtime.put_result_cache_entry(
440 self.cache_key(),
441 RuntimeResultCacheEntry {
442 result: result.clone(),
443 cached_at: std::time::Instant::now(),
444 scopes,
445 },
446 );
447 }
448 }
449
450 pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
451 // Detected via cheap prefix check so non-CTE queries skip the
452 // full parse here. CTE-bearing queries bypass the plan cache
453 // and result cache (rare workload — perf optimization is a
454 // follow-up). Inlining substitutes every CTE reference with
455 // its body as a subquery in FROM, after which the existing
456 // subquery-in-FROM machinery handles execution. Recursive
457 // CTEs are rejected explicitly until fixpoint execution wires
458 // through the runtime.
459 if !has_with_prefix(query) {
460 return Ok(None);
461 }
462 let parsed = crate::storage::query::parser::parse(query)
463 .map_err(|err| RedDBError::Query(err.to_string()))?;
464 if parsed.with_clause.is_some() {
465 let rewritten = crate::storage::query::executors::inline_ctes(parsed)
466 .map_err(|err| RedDBError::Query(err.to_string()))?;
467 return Ok(Some(rewritten));
468 }
469 // No WITH after parse (the prefix matched something else like
470 // `WITHIN` that already routed elsewhere) — fall through to
471 // the normal path with the original query.
472 Ok(None)
473 }
474
475 pub(super) fn prepare_statement(
476 &self,
477 runtime: &RedDBRuntime,
478 query: &str,
479 ) -> RedDBResult<PreparedStatement> {
480 let mode = detect_mode(query);
481 if matches!(mode, QueryMode::Unknown) {
482 return Err(RedDBError::Query("unable to detect query mode".to_string()));
483 }
484
485 // ── Plan cache: reuse only exact-query ASTs ──
486 //
487 // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
488 // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
489 // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
490 // Plan cache applies to statements whose shape can be
491 // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
492 // reuses the same plan across thousands of varying literals).
493 // INSERT is still bypassed — its shape changes per column set
494 // and bulk paths don't go through here anyway.
495 let first_word = query
496 .trim()
497 .split_ascii_whitespace()
498 .next()
499 .unwrap_or("")
500 .to_ascii_uppercase();
501 let is_insert = first_word == "INSERT";
502 // #1370 — volatile queries ($config / $secret resolve mutable runtime
503 // state at execution time) must bypass the plan cache too. A cached
504 // optimized plan drops the live `$config` resolution, so a later
505 // `SET CONFIG` would be ignored and the query would serve a stale value.
506 // Re-parse fresh every time so the resolver runs against current state.
507 let bypass_plan_cache = is_insert || self.is_volatile_query;
508
509 // Fused normalize+extract: one byte-scan produces both the
510 // cache_key AND the literal bindings. Saves a second Lexer
511 // pass over the query text on every cache hit — dominant
512 // cost on tight UPDATE loops that hit the same shape
513 // thousands of times with varying literals.
514 let (cache_key, prescan_binds) = if bypass_plan_cache {
515 (String::new(), Vec::new())
516 } else {
517 crate::storage::query::planner::cache_key::normalize_and_extract(query)
518 };
519
520 let expr = if bypass_plan_cache {
521 // Bypass plan cache for INSERT — shape varies per query.
522 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
523 } else {
524 // ── Hot path: read lock only (no writer serialization on cache hits) ──
525 //
526 // peek() is a non-mutating probe: no LRU promotion, no touch().
527 // This lets concurrent readers proceed without blocking each other.
528 // On hit we bind literals if needed and return immediately.
529 // Only on miss do we drop to a write lock to parse + insert.
530 let hit = {
531 let plan_cache = runtime.inner.query_cache.read();
532 plan_cache.peek(&cache_key).map(|cached| {
533 let parameter_count = cached.parameter_count;
534 let optimized = cached.plan.optimized.clone();
535 let exact_query = cached.exact_query.clone();
536 (parameter_count, optimized, exact_query)
537 })
538 };
539
540 if let Some((parameter_count, optimized, exact_query)) = hit {
541 if parameter_count > 0 {
542 // Shape hit: use the binds extracted during normalise.
543 let shape_binds = prescan_binds.clone();
544 if let Some(bound) =
545 crate::storage::query::planner::shape::bind_parameterized_query(
546 &optimized,
547 &shape_binds,
548 parameter_count,
549 )
550 {
551 bound
552 } else if exact_query.as_deref() == Some(query) {
553 // Bind failed but exact query matches — use as-is.
554 optimized
555 } else {
556 // Bind failed and literals differ: re-parse fresh.
557 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
558 }
559 } else {
560 // No parameters means either there truly are no literals,
561 // or this statement type does not participate in shape
562 // parameterization (for example graph/queue commands).
563 // Reusing a normalized-cache hit across a different exact
564 // query can therefore leak stale literals into execution.
565 if exact_query.as_deref() == Some(query) {
566 optimized
567 } else {
568 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
569 }
570 }
571 } else {
572 // Cache miss — parse, parameterize, store.
573 let parsed =
574 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
575 let (cached_expr, parameter_count) = if let Some(prepared) =
576 crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
577 {
578 (prepared.shape, prepared.parameter_count)
579 } else {
580 (parsed.clone(), 0)
581 };
582 {
583 let mut pc = runtime.inner.query_cache.write();
584 let plan = crate::storage::query::planner::QueryPlan::new(
585 parsed.clone(),
586 cached_expr,
587 Default::default(),
588 );
589 pc.insert(
590 cache_key.clone(),
591 crate::storage::query::planner::CachedPlan::new(plan)
592 .with_shape_key(cache_key.clone())
593 .with_exact_query(query.to_string())
594 .with_parameter_count(parameter_count),
595 );
596 }
597 parsed
598 }
599 };
600
601 // Phase 5 PG parity: substitute any registered view name that
602 // appears in the expression with its stored body. Runs after
603 // parse and before dispatch so the SQL entrypoint gets the
604 // same view resolution `execute_query_expr` already does.
605 let expr = runtime.rewrite_view_refs(expr);
606
607 Ok(PreparedStatement { expr, mode })
608 }
609
610 pub(super) fn check_query_privilege(
611 &self,
612 runtime: &RedDBRuntime,
613 expr: &QueryExpr,
614 ) -> RedDBResult<()> {
615 // Frame-level coarse gate. We consult `required_privilege()`
616 // (computed once at frame-build) against the captured identity
617 // before the deep grant engine walks the parsed expression.
618 // The coarse gate cannot ALLOW anything the grant engine would
619 // deny — it only short-circuits the obvious "Role::Read tries
620 // INSERT" case so a downstream caller never has to redo this
621 // check inline. `Privilege::None` (transaction control / SET /
622 // SHOW) flows through unchanged; the grant engine treats those
623 // as bypass too.
624 if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
625 let needed = <Self as ReadFrame>::required_privilege(self);
626 if !needed.is_satisfied_by(role) {
627 // Issue #205 — when the deep grant engine *also*
628 // denies, we treat this as an ordinary permission
629 // failure. But when an Admin-only statement reaches
630 // this gate without an auth_store wired (so the deep
631 // engine can't double-check), the coarse rejection is
632 // the only line of defence — emit an OperatorEvent so
633 // the operator notices an Admin-class statement was
634 // attempted with insufficient role.
635 if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
636 crate::telemetry::operator_event::OperatorEvent::AuthBypass {
637 principal: username.to_string(),
638 resource: format!("statement requiring {needed:?}"),
639 detail: format!(
640 "auth_store not wired; coarse gate is sole defence (role={role:?})"
641 ),
642 }
643 .emit_global();
644 }
645 return Err(RedDBError::Query(format!(
646 "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
647 )));
648 }
649 }
650 runtime
651 .check_query_privilege(expr)
652 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
653 }
654
655 pub(super) fn prepare_dispatch(
656 &self,
657 runtime: &RedDBRuntime,
658 expr: &QueryExpr,
659 ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
660 runtime.validate_model_operations_before_auth(expr)?;
661 self.check_query_privilege(runtime, expr)?;
662 Ok(self.acquire_intent_locks(runtime, expr))
663 }
664
665 pub(super) fn acquire_intent_locks(
666 &self,
667 runtime: &RedDBRuntime,
668 expr: &QueryExpr,
669 ) -> Option<crate::runtime::locking::LockerGuard> {
670 if !runtime.config_bool("concurrency.locking.enabled", true) {
671 return None;
672 }
673 // Frame-level short-circuit: if the statement carries no lock
674 // intent (transaction control, SET, SHOW), skip the lock
675 // manager entirely instead of letting `intent_lock_modes_for`
676 // walk the parsed expression to reach the same conclusion.
677 if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
678 return None;
679 }
680 intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
681 let mut guard =
682 crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
683 let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
684 for collection in collections_referenced(expr) {
685 let _ = guard.acquire(
686 crate::runtime::locking::Resource::Collection(collection),
687 coll_mode,
688 );
689 }
690 guard
691 })
692 }
693}
694
695impl ReadFrame for StatementExecutionFrame {
696 fn effective_scope(&self) -> Option<&str> {
697 self.effective_scope.as_deref()
698 }
699
700 fn identity(&self) -> Option<(&str, Role)> {
701 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
702 }
703
704 fn snapshot(&self) -> &Snapshot {
705 &self.snapshot
706 }
707
708 fn as_of_floor(&self) -> Option<Xid> {
709 self.as_of_floor
710 }
711
712 fn cache_key(&self) -> &str {
713 &self.cache_key
714 }
715
716 fn should_cache_result(&self) -> bool {
717 !self.is_volatile_query && self.cache_safe
718 }
719
720 fn required_privilege(&self) -> Privilege {
721 self.required_privilege
722 }
723
724 fn lock_intent(&self) -> LockIntent {
725 self.lock_intent
726 }
727
728 fn visible_collections(&self) -> Option<&HashSet<String>> {
729 self.visible_collections.as_ref()
730 }
731}
732
733/// Lightweight `ReadFrame` carrier used by AI command entry points
734/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
735///
736/// Issue #119 calls this struct `EffectiveScope`. It bundles the
737/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
738/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
739/// instead of re-reading thread-locals at every call site.
740///
741/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
742/// from the per-statement thread-locals (identical to how
743/// `StatementExecutionFrame::build` derives them) and resolves
744/// `visible_collections` via the `AuthStore` cache.
745pub struct EffectiveScope {
746 pub(crate) tenant: Option<String>,
747 pub(crate) identity: Option<(String, Role)>,
748 pub(crate) snapshot: Snapshot,
749 pub(crate) visible_collections: Option<HashSet<String>>,
750}
751
752impl EffectiveScope {
753 /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
754 /// to gate LLM-backed NER calls behind `ai:ner:read`.
755 ///
756 /// Placeholder for now: always returns `false`. The auth engine's
757 /// capability matrix is future work; until it lands, every routed
758 /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
759 /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
760 /// Documented in code so the wire-up is a one-line change once
761 /// the auth engine learns capabilities.
762 pub fn has_capability(&self, _capability: &str) -> bool {
763 false
764 }
765}
766
767impl ReadFrame for EffectiveScope {
768 fn effective_scope(&self) -> Option<&str> {
769 self.tenant.as_deref()
770 }
771 fn identity(&self) -> Option<(&str, Role)> {
772 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
773 }
774 fn snapshot(&self) -> &Snapshot {
775 &self.snapshot
776 }
777 fn as_of_floor(&self) -> Option<Xid> {
778 None
779 }
780 fn cache_key(&self) -> &str {
781 ""
782 }
783 fn should_cache_result(&self) -> bool {
784 false
785 }
786 fn required_privilege(&self) -> Privilege {
787 Privilege::Read
788 }
789 fn lock_intent(&self) -> LockIntent {
790 LockIntent::Shared
791 }
792 fn visible_collections(&self) -> Option<&HashSet<String>> {
793 self.visible_collections.as_ref()
794 }
795}
796
797impl RedDBRuntime {
798 /// Build the AI command `EffectiveScope` from the current
799 /// statement thread-locals + auth store.
800 ///
801 /// Returns `None` for embedded callers (no auth store, no
802 /// identity) — `AuthorizedSearch` treats `None` as deny-default.
803 pub(crate) fn ai_scope(&self) -> EffectiveScope {
804 let tenant = super::impl_core::current_tenant();
805 let identity = super::impl_core::current_auth_identity();
806 let snapshot = self.current_snapshot();
807 let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
808 (Some(store), Some((principal, role))) => {
809 let collections = self.inner.db.store().list_collections();
810 Some(store.visible_collections_for_scope(
811 tenant.as_deref(),
812 *role,
813 principal,
814 &collections,
815 ))
816 }
817 _ => None,
818 };
819 EffectiveScope {
820 tenant,
821 identity,
822 snapshot,
823 visible_collections,
824 }
825 }
826}
827
828/// Test fixtures for callers that need to drive `ReadFrame` without
829/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
830/// only leaks across module boundaries inside the crate.
831#[cfg(test)]
832pub(crate) mod test_support {
833 use super::{LockIntent, Privilege, ReadFrame};
834 use crate::auth::Role;
835 use crate::storage::transaction::snapshot::{Snapshot, Xid};
836 use std::collections::HashSet;
837
838 /// A `ReadFrame` impl with hand-set fields. Used by
839 /// `authorized_search` tests to assert the deny-default and
840 /// scope-trim behaviour without going through frame construction.
841 pub(crate) struct FakeReadFrame {
842 pub tenant: Option<String>,
843 pub identity: Option<(String, Role)>,
844 pub snapshot: Snapshot,
845 pub visible: Option<HashSet<String>>,
846 }
847
848 impl FakeReadFrame {
849 pub(crate) fn without_scope() -> Self {
850 Self {
851 tenant: None,
852 identity: None,
853 snapshot: Snapshot {
854 xid: 0,
855 in_progress: HashSet::new(),
856 },
857 visible: None,
858 }
859 }
860
861 pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
862 Self {
863 tenant: Some("acme".to_string()),
864 identity: Some(("alice".to_string(), Role::Read)),
865 snapshot: Snapshot {
866 xid: 0,
867 in_progress: HashSet::new(),
868 },
869 visible: Some(visible),
870 }
871 }
872 }
873
874 impl ReadFrame for FakeReadFrame {
875 fn effective_scope(&self) -> Option<&str> {
876 self.tenant.as_deref()
877 }
878 fn identity(&self) -> Option<(&str, Role)> {
879 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
880 }
881 fn snapshot(&self) -> &Snapshot {
882 &self.snapshot
883 }
884 fn as_of_floor(&self) -> Option<Xid> {
885 None
886 }
887 fn cache_key(&self) -> &str {
888 ""
889 }
890 fn should_cache_result(&self) -> bool {
891 false
892 }
893 fn required_privilege(&self) -> Privilege {
894 Privilege::Read
895 }
896 fn lock_intent(&self) -> LockIntent {
897 LockIntent::Shared
898 }
899 fn visible_collections(&self) -> Option<&HashSet<String>> {
900 self.visible.as_ref()
901 }
902 }
903}
904
905impl RedDBRuntime {
906 fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
907 let mut set = HashSet::new();
908 if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
909 set.insert(ctx.xid);
910 for (_, sub) in &ctx.savepoints {
911 set.insert(*sub);
912 }
913 for sub in &ctx.released_sub_xids {
914 set.insert(*sub);
915 }
916 }
917 set
918 }
919
920 /// Resolve the snapshot for the current statement, returning
921 /// the snapshot itself and (when AS OF is in effect) the
922 /// resolved xid floor. The floor is the same xid carried inside
923 /// `Snapshot.xid` for AS OF reads — exposing it separately lets
924 /// the `ReadFrame` Interface tell "live read" from "historical
925 /// read" without inferring from `in_progress.is_empty()`.
926 fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
927 match peek_top_level_as_of_with_table(query) {
928 Some((spec, Some(table))) => {
929 if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
930 return Err(RedDBError::InvalidConfig(format!(
931 "AS OF requires a versioned collection — \
932 `{table}` has not opted in. \
933 Call vcs.set_versioned(\"{table}\", true) first."
934 )));
935 }
936 let xid = self.vcs_resolve_as_of(spec)?;
937 Ok((
938 Snapshot {
939 xid,
940 in_progress: HashSet::new(),
941 },
942 Some(xid),
943 ))
944 }
945 Some((spec, None)) => {
946 let xid = self.vcs_resolve_as_of(spec)?;
947 Ok((
948 Snapshot {
949 xid,
950 in_progress: HashSet::new(),
951 },
952 Some(xid),
953 ))
954 }
955 None => Ok((self.current_snapshot(), None)),
956 }
957 }
958
959 fn result_cache_safe(&self, conn_id: u64) -> bool {
960 let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
961 let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
962 !has_active_xids && !in_own_tx
963 }
964}
965
966/// Whether a result's `engine` tag is one of the graph-analytics TVF
967/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
968/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
969/// algorithm output that is cached regardless of row count.
970fn is_graph_tvf_engine(engine: &str) -> bool {
971 matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
972}
973
974fn result_cache_key(query: &str) -> String {
975 let tenant = current_tenant().unwrap_or_default();
976 let auth = current_auth_identity()
977 .map(|(user, role)| format!("{}|{:?}", user, role))
978 .unwrap_or_default();
979 if tenant.is_empty() && auth.is_empty() {
980 query.to_string()
981 } else {
982 format!("{query}\u{001e}{tenant}\u{001e}{auth}")
983 }
984}
985
986#[cfg(test)]
987mod tests {
988 use super::*;
989 use crate::api::RedDBOptions;
990 use crate::runtime::impl_core::{
991 clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
992 set_current_tenant,
993 };
994 use crate::runtime::RedDBRuntime;
995
996 fn fresh_runtime() -> RedDBRuntime {
997 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
998 }
999
1000 /// Ensure thread-local state from a prior test can't leak into
1001 /// the next one — tests in the same binary share the thread.
1002 fn reset_thread_locals() {
1003 clear_current_tenant();
1004 clear_current_auth_identity();
1005 }
1006
1007 #[test]
1008 fn autocommit_select_takes_live_snapshot() {
1009 reset_thread_locals();
1010 let rt = fresh_runtime();
1011 let frame =
1012 StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
1013
1014 // Live reads: no AS OF floor, snapshot bounded by the
1015 // manager's `peek_next_xid` so committed tuples are visible.
1016 let f: &dyn ReadFrame = &frame;
1017 assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
1018 assert!(
1019 f.snapshot().xid >= 1,
1020 "autocommit snapshot xid is bounded by peek_next_xid"
1021 );
1022 }
1023
1024 #[test]
1025 fn frame_captures_identity_and_scope() {
1026 reset_thread_locals();
1027 set_current_tenant("acme".to_string());
1028 set_current_auth_identity("alice".to_string(), Role::Write);
1029
1030 let rt = fresh_runtime();
1031 let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1032 let f: &dyn ReadFrame = &frame;
1033
1034 assert_eq!(f.effective_scope(), Some("acme"));
1035 let id = f.identity().expect("identity captured");
1036 assert_eq!(id.0, "alice");
1037 assert!(matches!(id.1, Role::Write));
1038
1039 // Cache key mixes scope + identity so two callers under
1040 // different tenants never share a cache slot.
1041 assert!(
1042 f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1043 "cache key folds in scope + identity, got {:?}",
1044 f.cache_key()
1045 );
1046
1047 reset_thread_locals();
1048 }
1049
1050 #[test]
1051 fn as_of_rejects_non_versioned_user_collection() {
1052 reset_thread_locals();
1053 let rt = fresh_runtime();
1054
1055 // `not_versioned` is a plain user collection — the frame
1056 // builder must reject AS OF until the caller opts in via
1057 // `vcs.set_versioned`.
1058 let err = match StatementExecutionFrame::build(
1059 &rt,
1060 "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1061 ) {
1062 Err(e) => e,
1063 Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1064 };
1065
1066 let msg = format!("{err}");
1067 assert!(
1068 msg.contains("AS OF requires a versioned collection"),
1069 "expected AS OF rejection, got: {msg}"
1070 );
1071 }
1072
1073 /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1074 ///
1075 /// Sets a tenant + identity via the public thread-local API the
1076 /// runtime uses for ambient scope, drives a real `SELECT` through
1077 /// `execute_query`, then inspects the result cache that the SELECT
1078 /// path populates via `frame.cache_key()`. The key only carries
1079 /// the tenant + identity *because* it was built through the frame —
1080 /// reverting the wiring to inline `current_tenant()` /
1081 /// `current_auth_identity()` reads would still pass this test, but
1082 /// dropping the frame entirely (so the SELECT path stopped touching
1083 /// `cache_key`) would break it.
1084 #[test]
1085 fn select_path_routes_through_frame_cache_key() {
1086 reset_thread_locals();
1087 set_current_tenant("acme".to_string());
1088 set_current_auth_identity("alice".to_string(), Role::Read);
1089
1090 let rt = fresh_runtime();
1091 let result = rt
1092 .execute_query("SELECT 1")
1093 .expect("SELECT 1 executes under tenant=acme/identity=alice");
1094 assert_eq!(result.statement_type, "select");
1095
1096 // The textual SELECT path builds a frame and
1097 // writes its result through `frame.cache_key()`. That key folds
1098 // tenant + identity in via `result_cache_key`, so finding "acme"
1099 // and "alice" inside any cached key proves the frame was the
1100 // seam used.
1101 let cache = rt.inner.result_cache.read();
1102 let any_keyed_with_scope = cache
1103 .0
1104 .keys()
1105 .any(|k| k.contains("acme") && k.contains("alice"));
1106 assert!(
1107 any_keyed_with_scope,
1108 "expected at least one result-cache key carrying tenant+identity, \
1109 got keys: {:?}",
1110 cache.0.keys().collect::<Vec<_>>()
1111 );
1112
1113 reset_thread_locals();
1114 }
1115
1116 /// A SELECT that calls a volatile builtin (here:
1117 /// `pg_advisory_unlock`, the volatile token the runtime currently
1118 /// recognises in `query_has_volatile_builtin`) must NOT populate
1119 /// the result cache. Any caller hitting the cache after this would
1120 /// see a stale answer for an inherently-volatile query, so the
1121 /// SELECT path gates writes through `frame.should_cache_result()`.
1122 ///
1123 /// Deletion test: removing `ReadFrame::should_cache_result`, or
1124 /// reverting the SELECT path to skip its safety gate, would let
1125 /// the result cache silently absorb this statement and break the
1126 /// assertion below.
1127 #[test]
1128 fn volatile_select_does_not_populate_result_cache() {
1129 reset_thread_locals();
1130 let rt = fresh_runtime();
1131
1132 // Frame-level invariant: the volatile-builtin signal collapses
1133 // `should_cache_result` to false even for an autocommit /
1134 // out-of-tx connection.
1135 let frame =
1136 StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1137 let f: &dyn ReadFrame = &frame;
1138 assert!(
1139 !f.should_cache_result(),
1140 "volatile builtin must disable result-cache safety"
1141 );
1142
1143 // End-to-end: drive the volatile SELECT through `execute_query`
1144 // and confirm no entry was stamped under its cache key. Other
1145 // entries from prior tests sharing the binary may exist, so we
1146 // assert specifically on this query's key.
1147 let _ = rt
1148 .execute_query("SELECT pg_advisory_unlock(1)")
1149 .expect("volatile SELECT executes");
1150 let cache = rt.inner.result_cache.read();
1151 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1152 assert!(
1153 !cache.0.contains_key(&key),
1154 "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1155 cache.0.keys().collect::<Vec<_>>()
1156 );
1157
1158 reset_thread_locals();
1159 }
1160
1161 #[test]
1162 fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1163 reset_thread_locals();
1164 let rt = fresh_runtime();
1165 rt.inner
1166 .db
1167 .store()
1168 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1169
1170 let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1171 assert_eq!(result.statement_type, "select");
1172
1173 let key = result_cache_key("SELECT 1");
1174 assert!(
1175 rt.inner
1176 .result_blob_cache
1177 .get("runtime.result_cache", &key)
1178 .is_some(),
1179 "blob backend should stamp the Blob Cache path"
1180 );
1181 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1182 assert!(
1183 !rt.inner.result_cache.read().0.contains_key(&key),
1184 "blob backend should not write the legacy map"
1185 );
1186 }
1187
1188 #[test]
1189 fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1190 reset_thread_locals();
1191 let rt = fresh_runtime();
1192 rt.inner
1193 .db
1194 .store()
1195 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1196
1197 let _ = rt
1198 .execute_query("SELECT pg_advisory_unlock(1)")
1199 .expect("volatile SELECT executes");
1200 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1201 assert!(
1202 rt.inner
1203 .result_blob_cache
1204 .get("runtime.result_cache", &key)
1205 .is_none(),
1206 "volatile SELECT must not populate blob result cache"
1207 );
1208 assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1209 }
1210
1211 #[test]
1212 fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1213 reset_thread_locals();
1214 let rt = fresh_runtime();
1215 rt.inner
1216 .db
1217 .store()
1218 .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1219
1220 let first = rt.execute_query("SELECT 1").expect("first SELECT");
1221 let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1222 assert_eq!(first.result.len(), second.result.len());
1223
1224 let key = result_cache_key("SELECT 1");
1225 assert!(rt.inner.result_cache.read().0.contains_key(&key));
1226 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1227 assert_eq!(rt.result_cache_shadow_divergences(), 0);
1228 assert_eq!(
1229 crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1230 "cache_shadow_divergence_total"
1231 );
1232 }
1233
1234 #[test]
1235 fn as_of_on_red_collection_records_floor() {
1236 reset_thread_locals();
1237 let rt = fresh_runtime();
1238
1239 // `red_*` collections always allow AS OF. The frame should
1240 // resolve to a concrete xid and surface it via the Interface.
1241 let frame =
1242 StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1243 .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1244
1245 let f: &dyn ReadFrame = &frame;
1246 assert_eq!(
1247 f.as_of_floor(),
1248 Some(1),
1249 "AS OF SNAPSHOT 1 records xid=1 as the floor"
1250 );
1251 assert_eq!(f.snapshot().xid, 1);
1252 assert!(
1253 f.snapshot().in_progress.is_empty(),
1254 "historical reads have no in-progress set"
1255 );
1256 }
1257
1258 /// The frame classifies common SQL prefixes into the coarse
1259 /// `Privilege` / `LockIntent` buckets at build time. This test
1260 /// pins the mapping so a regression that silently re-routes
1261 /// (e.g. INSERT classified as Read) surfaces here, not at a
1262 /// downstream privilege gate.
1263 #[test]
1264 fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1265 reset_thread_locals();
1266 let rt = fresh_runtime();
1267
1268 let cases = [
1269 ("SELECT 1", Privilege::Read, LockIntent::Shared),
1270 ("LIST KV settings", Privilege::Read, LockIntent::Shared),
1271 (
1272 "KV GET settings.feature",
1273 Privilege::Read,
1274 LockIntent::Shared,
1275 ),
1276 ("VAULT LIST secrets", Privilege::Read, LockIntent::Shared),
1277 (
1278 "INSERT INTO t (id) VALUES (1)",
1279 Privilege::Write,
1280 LockIntent::Exclusive,
1281 ),
1282 (
1283 "KV PUT settings.feature = 'on'",
1284 Privilege::Write,
1285 LockIntent::Exclusive,
1286 ),
1287 (
1288 "VAULT PUT secrets.api = 'x'",
1289 Privilege::Write,
1290 LockIntent::Exclusive,
1291 ),
1292 (
1293 "UPDATE t SET x = 1 WHERE id = 1",
1294 Privilege::Write,
1295 LockIntent::Exclusive,
1296 ),
1297 (
1298 "DELETE FROM t WHERE id = 1",
1299 Privilege::Write,
1300 LockIntent::Exclusive,
1301 ),
1302 (
1303 "CREATE TABLE foo (id INT)",
1304 Privilege::Write,
1305 LockIntent::Exclusive,
1306 ),
1307 ("BEGIN", Privilege::None, LockIntent::None),
1308 ("COMMIT", Privilege::None, LockIntent::None),
1309 ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1310 ];
1311
1312 for (q, want_priv, want_lock) in cases {
1313 let frame = StatementExecutionFrame::build(&rt, q)
1314 .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1315 let f: &dyn ReadFrame = &frame;
1316 assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1317 assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1318 }
1319 }
1320
1321 /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1322 /// driven through `execute_query` under an identity whose role
1323 /// doesn't satisfy the frame's coarse `Read` privilege gets
1324 /// denied with the frame's signal.
1325 ///
1326 /// We test the gate by classifying an INSERT (which the frame
1327 /// reports as `Privilege::Write`) under `Role::Read` — the only
1328 /// pair the legacy fallback would also reject, but here the
1329 /// rejection comes through `frame.check_query_privilege` BEFORE
1330 /// the parsed-expression walker runs. Removing
1331 /// `required_privilege` (or the `is_satisfied_by` consult inside
1332 /// `check_query_privilege`) would force the deny path back to the
1333 /// inline `RedDBRuntime::check_query_privilege` walker — but the
1334 /// auth_store gate up there is bypassed when no auth_store is
1335 /// wired (embedded test mode), so this test would FLIP from
1336 /// denied to permitted and break the assertion below.
1337 #[test]
1338 fn insert_under_read_role_denied_via_frame_privilege() {
1339 reset_thread_locals();
1340 set_current_auth_identity("alice".to_string(), Role::Read);
1341
1342 let rt = fresh_runtime();
1343 // Bypass parser by reaching into the frame directly: the
1344 // frame derives privilege from the SQL prefix without
1345 // needing an auth_store wired up. Driving end-to-end via
1346 // `execute_query` would also reject (no table `t`), but for
1347 // a different reason — we want to pin the privilege seam.
1348 let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1349 .expect("frame builds for INSERT");
1350 let f: &dyn ReadFrame = &frame;
1351 assert_eq!(
1352 f.required_privilege(),
1353 Privilege::Write,
1354 "INSERT classified as Write"
1355 );
1356 let id = f.identity().expect("identity captured");
1357 assert!(
1358 !f.required_privilege().is_satisfied_by(id.1),
1359 "Role::Read does not satisfy Privilege::Write — frame must deny"
1360 );
1361
1362 // End-to-end: the frame's `check_query_privilege` sees the
1363 // (Read role, Write privilege) mismatch and denies before
1364 // dispatch. We drive a synthetic `QueryExpr::Table` because
1365 // the SELECT/INSERT parser would happen to also fail, and we
1366 // want the failure to come from the privilege seam.
1367 use crate::storage::query::ast::{QueryExpr, TableQuery};
1368 let expr = QueryExpr::Table(TableQuery::new("t"));
1369 let err = frame
1370 .check_query_privilege(&rt, &expr)
1371 .expect_err("denied via frame's coarse privilege gate");
1372 let msg = format!("{err}");
1373 assert!(
1374 msg.contains("permission denied") && msg.contains("Write"),
1375 "expected frame-level Write deny, got: {msg}"
1376 );
1377
1378 reset_thread_locals();
1379 }
1380
1381 /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1382 /// wired into the SELECT path and produces observable results
1383 /// byte-identical to the per-request-allocation baseline.
1384 ///
1385 /// A table with more rows than the streaming high-water mark
1386 /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1387 /// path to assemble many chunks, each leasing/recycling the frame
1388 /// arena's single chunk buffer. Driving it through `execute_query`
1389 /// (which builds a `StatementExecutionFrame` and lends its arena)
1390 /// must return every inserted row, in order — exactly what the
1391 /// allocate-per-chunk path returned. A bug in the arena wiring
1392 /// (dropped rows, bled rows, mis-ordering) would surface here.
1393 #[test]
1394 fn large_select_through_frame_arena_returns_all_rows_in_order() {
1395 reset_thread_locals();
1396 let rt = fresh_runtime();
1397 rt.execute_query("CREATE TABLE big (id INT)")
1398 .expect("create table");
1399
1400 // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1401 // spans multiple chunks and the arena buffer is reused.
1402 const N: usize = 2_500;
1403 for start in (0..N).step_by(250) {
1404 let end = (start + 250).min(N);
1405 let values = (start..end)
1406 .map(|i| format!("({i})"))
1407 .collect::<Vec<_>>()
1408 .join(", ");
1409 rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1410 .unwrap_or_else(|err| panic!("insert rows {start}..{end}: {err:?}"));
1411 }
1412
1413 let result = rt
1414 .execute_query("SELECT id FROM big ORDER BY id")
1415 .expect("large SELECT executes through the frame arena path");
1416 assert_eq!(result.statement_type, "select");
1417 assert_eq!(
1418 result.result.records.len(),
1419 N,
1420 "every inserted row streams back through the arena-backed channel"
1421 );
1422 for (i, record) in result.result.records.iter().enumerate() {
1423 assert_eq!(
1424 record.get("id"),
1425 Some(&crate::storage::schema::Value::Integer(i as i64)),
1426 "row {i} is byte-identical to the per-request-allocation baseline"
1427 );
1428 }
1429
1430 reset_thread_locals();
1431 }
1432
1433 /// Transport adapters may decode their wire-specific parameter value
1434 /// shapes, but SQL parsing/binding must stay behind the runtime's
1435 /// statement entrypoint. This pins the deeper seam introduced for
1436 /// parameterized query execution: HTTP, JSON-RPC, RedWire, PG wire,
1437 /// and gRPC all call `RedDBRuntime::execute_query_with_params`, which
1438 /// installs a real `StatementExecutionFrame` before dispatch.
1439 #[test]
1440 fn parameterized_transport_adapters_delegate_binding_to_runtime() {
1441 let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
1442 let adapters = [
1443 "src/server/handlers_query.rs",
1444 "src/rpc_stdio.rs",
1445 "src/wire/redwire/session.rs",
1446 "src/wire/postgres/server.rs",
1447 "src/grpc.rs",
1448 ];
1449
1450 for relative in adapters {
1451 let path = manifest_dir.join(relative);
1452 let text = std::fs::read_to_string(&path)
1453 .unwrap_or_else(|err| panic!("read {}: {err}", path.display()));
1454 assert!(
1455 text.contains("execute_query_with_params"),
1456 "{relative} should delegate parameterized query execution to the runtime"
1457 );
1458 assert!(
1459 !text.contains("user_params::bind"),
1460 "{relative} must not bind SQL params in the transport adapter"
1461 );
1462 }
1463 }
1464
1465 /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1466 /// control statement carries `LockIntent::None` and the
1467 /// `acquire_intent_locks` path returns `None` without consulting
1468 /// `intent_lock_modes_for`. Removing the method (or its consult
1469 /// site in `acquire_intent_locks`) would force the lock-mode
1470 /// helper to walk a fabricated parsed expression to reach the
1471 /// same conclusion — but the assertion that no guard is allocated
1472 /// for a `BEGIN` frame would still hold, so we additionally pin
1473 /// the classifier mapping above to make the deletion observable.
1474 #[test]
1475 fn control_statement_skips_intent_locks_via_frame() {
1476 reset_thread_locals();
1477 let rt = fresh_runtime();
1478
1479 let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1480 let f: &dyn ReadFrame = &frame;
1481 assert_eq!(f.lock_intent(), LockIntent::None);
1482
1483 // Drive `acquire_intent_locks` against a fabricated SELECT
1484 // expression that WOULD normally yield `(IS, IS)`; the frame's
1485 // `lock_intent() == None` short-circuit must still suppress
1486 // the guard.
1487 use crate::storage::query::ast::{QueryExpr, TableQuery};
1488 let expr = QueryExpr::Table(TableQuery::new("t"));
1489 let guard = frame.acquire_intent_locks(&rt, &expr);
1490 assert!(
1491 guard.is_none(),
1492 "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1493 );
1494 }
1495}