Skip to main content

reddb_server/runtime/
execution_context.rs

1//! Per-thread runtime execution context.
2//!
3//! This module owns connection identity, auth/tenant scope, statement-local
4//! config/secret resolvers, and MVCC snapshot visibility helpers.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::api::{RedDBError, RedDBResult};
10use crate::storage::schema::Value;
11use crate::storage::RedDB;
12
13thread_local! {
14    /// Current connection id for the executing statement. Set by the
15    /// per-connection wrapper (stdio/gRPC handlers) before dispatching
16    /// into `execute_query`; falls back to `0` for embedded callers.
17    static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
18
19    /// Authenticated user + role for the executing statement (Phase 2.5.2
20    /// RLS enforcement). Set by the transport middleware after validating
21    /// credentials (password / cert / oauth); unset means "anonymous" /
22    /// "embedded" — RLS policies degrade to the role-agnostic subset.
23    ///
24    /// `None` skips RLS injection entirely; `Some((username, role))`
25    /// passes `role` to `matching_rls_policies(table, Some(role), action)`.
26    static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
27        const { std::cell::RefCell::new(None) };
28
29    /// MVCC snapshot scoped to the currently-executing statement (Phase
30    /// 2.3.2d PG parity). `execute_query` captures it on entry and drops
31    /// it on exit; every scan consults it via
32    /// `entity_visible_under_current_snapshot` to hide tuples whose xmin
33    /// hasn't committed or whose xmax already has.
34    ///
35    /// `None` means "pre-MVCC semantics" — the read path returns every
36    /// tuple regardless of xmin/xmax. All embedded callers that bypass
37    /// `execute_query` see this default.
38    static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
39        const { std::cell::RefCell::new(None) };
40
41    /// Cheap presence flag for `CURRENT_SNAPSHOT`. Scan hot paths
42    /// poll this instead of `borrow()`-ing the RefCell on every
43    /// row — the common case (autocommit / no MVCC session) reads
44    /// one atomic `Cell<bool>` and short-circuits, saving ~10ns × N
45    /// rows on aggregate_group / select_range scans.
46    static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
47
48    /// Session-scoped tenant id for the current connection (Phase 2.5.3
49    /// multi-tenancy). Populated by `SET TENANT 'id'` or by transport
50    /// middleware after resolving tenant from auth claims. Read by the
51    /// `CURRENT_TENANT()` scalar function — RLS policies typically
52    /// combine it as `USING (tenant_id = CURRENT_TENANT())` to scope
53    /// every query to one tenant.
54    ///
55    /// `None` means "no tenant bound" — `CURRENT_TENANT()` returns
56    /// NULL, and RLS policies that gate on it hide every row.
57    static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
58        const { std::cell::RefCell::new(None) };
59
60    /// Statement-local config resolver. SQL expressions materialize the
61    /// `red_config` snapshot lazily on the first `$config.*`/`CONFIG()`
62    /// access, keeping ordinary statements on the zero-scan path.
63    static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
64        const { std::cell::RefCell::new(None) };
65
66    /// Statement-local secret resolver. SQL expressions materialize the
67    /// vault KV snapshot lazily on first `$secret.*` access, then use
68    /// lock-free map reads for the rest of the statement.
69    static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
70        const { std::cell::RefCell::new(None) };
71}
72
73/// Snapshot + manager pair used for read-path visibility checks.
74///
75/// The manager is needed in addition to the snapshot because `aborted`
76/// state mutates after the snapshot is captured — a ROLLBACK by a
77/// committed-at-capture-time writer must still hide its tuples. Keeping
78/// the Arc around is O(pointer) and the RwLock reads on `is_aborted`
79/// are cheap (HashSet lookup under a parking_lot read guard).
80///
81/// `own_xids` (Phase 2.3.2e) lists the xids belonging to the current
82/// connection's transaction — the parent xid plus open and released
83/// savepoint sub-xids. The visibility rule promotes rows stamped with
84/// these xids to "always visible (unless aborted)" so the writer sees
85/// its own nested-savepoint writes even though their xids exceed
86/// `snapshot.xid`.
87#[derive(Clone)]
88pub struct SnapshotContext {
89    pub snapshot: crate::storage::transaction::snapshot::Snapshot,
90    pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
91    pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
92    pub requires_index_fallback: bool,
93}
94
95/// Install a connection id on the current thread for the duration of a
96/// statement. Transaction state (`RuntimeInner::tx_contexts`) is keyed
97/// by this id so different connections can hold independent BEGINs.
98///
99/// Pub so transports (PG wire, gRPC, HTTP per-request spawners) and
100/// tests can emulate per-connection isolation. Call it once when
101/// binding the connection's worker thread; pair with
102/// `clear_current_connection_id` on teardown.
103pub fn set_current_connection_id(id: u64) {
104    CURRENT_CONN_ID.with(|c| c.set(id));
105}
106
107/// Reset the thread's connection id back to `0` (autocommit).
108pub fn clear_current_connection_id() {
109    CURRENT_CONN_ID.with(|c| c.set(0));
110}
111
112/// Read the connection id set by `set_current_connection_id`. Returns
113/// `0` when no wrapper installed one — auto-commit path.
114pub fn current_connection_id() -> u64 {
115    CURRENT_CONN_ID.with(|c| c.get())
116}
117
118/// Install the authenticated identity for the current thread (Phase 2.5.2
119/// RLS enforcement). Transport layers call this right after resolving
120/// auth so the query dispatch can fold RLS policies into the filter.
121pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
122    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
123}
124
125/// Clear the thread-local auth identity. Transports call this after the
126/// statement completes so pooled threads don't leak identities across
127/// requests.
128pub fn clear_current_auth_identity() {
129    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
130}
131
132/// Read the current-thread auth identity. `None` when no transport
133/// installed one (embedded mode / anonymous access).
134pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
135    CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
136}
137
138/// Public probe of the thread-local auth identity for callers outside
139/// the `runtime` module (e.g. the AI credential resolver, which audits
140/// who triggered a secret read on behalf of a query).
141pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
142    current_auth_identity()
143}
144
145/// Install the session tenant id for the current thread (Phase 2.5.3
146/// multi-tenancy). Called by `SET TENANT 'id'` dispatch and by
147/// transport middleware that resolves tenant from auth claims (e.g.
148/// JWT `tenant` claim, HTTP header, subdomain).
149pub fn set_current_tenant(tenant_id: String) {
150    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
151}
152
153/// Clear the current-thread tenant — `CURRENT_TENANT()` will then
154/// return NULL and any RLS policy gated on it will hide every row.
155pub fn clear_current_tenant() {
156    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
157}
158
159/// Read the current-thread tenant id, applying overrides in priority order:
160///   1. `WITHIN TENANT '<id>' …` per-statement override (highest)
161///   2. `SET LOCAL TENANT '<id>'` transaction-local override (consulted
162///      only when the current connection has an open transaction)
163///   3. `SET TENANT '<id>'` session-level thread-local
164///   4. `None` (deny-default for RLS).
165///
166/// The transaction-local layer is read through the runtime; an embedded
167/// helper crate that has no `RedDBRuntime` access still gets correct
168/// behaviour for layers 1, 3, and 4.
169pub fn current_tenant() -> Option<String> {
170    let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
171    if let Some(over) = current_scope_override() {
172        if over.tenant.is_active() {
173            return over.tenant.resolve(inherited);
174        }
175    }
176    if let Some(tx_local) = current_tx_local_tenant() {
177        return tx_local;
178    }
179    inherited
180}
181
182thread_local! {
183    /// Snapshot of the active connection's `tx_local_tenants` entry for
184    /// the current `execute_query` call. Outer `Some(_)` means "a
185    /// transaction-local tenant override is active for this call";
186    /// inner is the override's value (`Some(s)` overrides to `s`,
187    /// `None` overrides to NULL/cleared). Refreshed at the top of every
188    /// `execute_query` invocation and cleared by the RAII guard on
189    /// return so pooled connections cannot leak the override past the
190    /// statement that owns it.
191    static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
192        const { std::cell::RefCell::new(None) };
193}
194
195fn current_tx_local_tenant() -> Option<Option<String>> {
196    TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
197}
198
199/// Recognise `SET LOCAL TENANT '<id>'` / `SET LOCAL TENANT NULL` —
200/// returns `Ok(Some(Some(id)))` for an explicit value, `Ok(Some(None))`
201/// for an explicit NULL clear, `Ok(None)` when the input is not a
202/// `SET LOCAL TENANT` statement at all, and `Err` when the prefix
203/// matches but the value is malformed.
204pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
205    let mut tokens = query.split_ascii_whitespace();
206    let Some(w1) = tokens.next() else {
207        return Ok(None);
208    };
209    if !w1.eq_ignore_ascii_case("SET") {
210        return Ok(None);
211    }
212    let Some(w2) = tokens.next() else {
213        return Ok(None);
214    };
215    if !w2.eq_ignore_ascii_case("LOCAL") {
216        return Ok(None);
217    }
218    let Some(w3) = tokens.next() else {
219        return Ok(None);
220    };
221    if !w3.eq_ignore_ascii_case("TENANT") {
222        return Ok(None);
223    }
224    let rest: String = tokens.collect::<Vec<_>>().join(" ");
225    let rest = rest.trim().trim_end_matches(';').trim();
226    let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
227    if value_str.is_empty() {
228        return Err(RedDBError::Query(
229            "SET LOCAL TENANT expects a string literal or NULL".to_string(),
230        ));
231    }
232    if value_str.eq_ignore_ascii_case("NULL") {
233        return Ok(Some(None));
234    }
235    if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
236        let inner = &value_str[1..value_str.len() - 1];
237        return Ok(Some(Some(inner.to_string())));
238    }
239    Err(RedDBError::Query(format!(
240        "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
241    )))
242}
243
244pub(crate) struct TxLocalTenantGuard;
245
246impl TxLocalTenantGuard {
247    pub fn install(value: Option<Option<String>>) -> Self {
248        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
249        Self
250    }
251}
252
253impl Drop for TxLocalTenantGuard {
254    fn drop(&mut self) {
255        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
256    }
257}
258
259thread_local! {
260    /// Stack of `WITHIN ... <stmt>` overrides active on the current
261    /// thread. Every entry corresponds to one in-flight `execute_query`
262    /// call that started with a `WITHIN` prefix; the entry is pushed
263    /// before dispatch and popped before the call returns. The stack
264    /// shape supports nested invocations (e.g. a view body that itself
265    /// re-enters execute_query).
266    static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
267        const { std::cell::RefCell::new(Vec::new()) };
268}
269
270pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
271    SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
272}
273
274pub(crate) fn pop_scope_override() {
275    SCOPE_OVERRIDES.with(|cell| {
276        cell.borrow_mut().pop();
277    });
278}
279
280pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
281    SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
282}
283
284/// Cheap probe: is any `WITHIN …` scope override active on this
285/// thread? The fast-path needs to know without paying for the full
286/// `.last().cloned()` allocation — just peek at stack length.
287pub(crate) fn has_scope_override_active() -> bool {
288    SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
289}
290
291/// RAII guard pairing `push_scope_override` with the matching pop, so
292/// the stack stays balanced even when the inner `execute_query` returns
293/// early via `?`.
294pub(crate) struct ScopeOverrideGuard;
295
296impl ScopeOverrideGuard {
297    pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
298        push_scope_override(over);
299        Self
300    }
301}
302
303impl Drop for ScopeOverrideGuard {
304    fn drop(&mut self) {
305        pop_scope_override();
306    }
307}
308
309/// Read the current-thread auth identity, honouring per-statement
310/// `WITHIN ... USER '<u>' AS ROLE '<r>'` overrides. The override only
311/// supplies projected strings — it never grants additional privilege —
312/// so callers that need to make authorisation decisions must read from
313/// the underlying `current_auth_identity()` directly.
314pub(crate) fn current_user_projected() -> Option<String> {
315    let inherited = current_auth_identity().map(|(u, _)| u);
316    if let Some(over) = current_scope_override() {
317        if over.user.is_active() {
318            return over.user.resolve(inherited);
319        }
320    }
321    inherited
322}
323
324pub(crate) fn current_role_projected() -> Option<String> {
325    let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
326    if let Some(over) = current_scope_override() {
327        if over.role.is_active() {
328            return over.role.resolve(inherited);
329        }
330    }
331    inherited
332}
333
334pub(crate) fn current_secret_value(path: &str) -> Option<String> {
335    let key = path.to_ascii_lowercase();
336    CURRENT_SECRET_RESOLVER.with(|cell| {
337        let mut resolver = cell.borrow_mut();
338        let resolver = resolver.as_mut()?;
339        if resolver.values.is_none() {
340            resolver.values = resolver
341                .store
342                .as_ref()
343                .map(|store| store.vault_kv_snapshot());
344        }
345        let values = resolver.values.as_ref()?;
346        values.get(&key).cloned().or_else(|| {
347            key.strip_prefix("red.vault/").and_then(|rest| {
348                values
349                    .get(rest)
350                    .cloned()
351                    .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
352            })
353        })
354    })
355}
356
357struct SecretResolver {
358    store: Option<Arc<crate::auth::store::AuthStore>>,
359    values: Option<HashMap<String, String>>,
360}
361
362pub(crate) struct SecretStoreGuard {
363    previous: Option<SecretResolver>,
364}
365
366impl SecretStoreGuard {
367    pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
368        let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
369            cell.replace(Some(SecretResolver {
370                store,
371                values: None,
372            }))
373        });
374        Self { previous }
375    }
376}
377
378impl Drop for SecretStoreGuard {
379    fn drop(&mut self) {
380        let previous = self.previous.take();
381        CURRENT_SECRET_RESOLVER.with(|cell| {
382            cell.replace(previous);
383        });
384    }
385}
386
387pub(crate) fn current_config_value(path: &str) -> Option<Value> {
388    let key = path.to_ascii_lowercase();
389    CURRENT_CONFIG_RESOLVER.with(|cell| {
390        let mut resolver = cell.borrow_mut();
391        let resolver = resolver.as_mut()?;
392        if resolver.values.is_none() {
393            resolver.values = Some(latest_config_snapshot(&resolver.db));
394        }
395        let values = resolver.values.as_ref()?;
396        values.get(&key).cloned().or_else(|| {
397            key.strip_prefix("red.config/")
398                .and_then(|rest| values.get(&format!("red.config.{rest}")).cloned())
399        })
400    })
401}
402
403pub(crate) fn update_current_config_value(path: &str, value: Value) {
404    let key = path.to_ascii_lowercase();
405    CURRENT_CONFIG_RESOLVER.with(|cell| {
406        if let Some(resolver) = cell.borrow_mut().as_mut() {
407            if let Some(values) = resolver.values.as_mut() {
408                values.insert(key, value);
409            }
410        }
411    });
412}
413
414pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
415    let key = path.to_ascii_lowercase();
416    CURRENT_SECRET_RESOLVER.with(|cell| {
417        if let Some(resolver) = cell.borrow_mut().as_mut() {
418            let Some(values) = resolver.values.as_mut() else {
419                return;
420            };
421            match value {
422                Some(value) => {
423                    values.insert(key, value);
424                }
425                None => {
426                    values.remove(&key);
427                }
428            }
429        }
430    });
431}
432
433fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
434    let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
435
436    if let Some(manager) = db.store().get_collection("red_config") {
437        manager.for_each_entity(|entity| {
438            let Some(row) = entity.data.as_row() else {
439                return true;
440            };
441            let Some(Value::Text(key)) = row.get_field("key") else {
442                return true;
443            };
444            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
445            let id = entity.id.raw();
446            let key = key.to_ascii_lowercase();
447            insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
448            if let Some(rest) = key.strip_prefix("red.config.") {
449                insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
450            }
451            true
452        });
453    }
454
455    if let Some(manager) = db.store().get_collection("red.config") {
456        manager.for_each_entity(|entity| {
457            let Some(row) = entity.data.as_row() else {
458                return true;
459            };
460            if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
461                return true;
462            }
463            let Some(Value::Text(key)) = row.get_field("key") else {
464                return true;
465            };
466            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
467            insert_latest_config_value(
468                &mut latest,
469                format!("red.config/{}", key.to_ascii_lowercase()),
470                entity.id.raw(),
471                value,
472            );
473            true
474        });
475    }
476
477    latest
478        .into_iter()
479        .map(|(key, (_, value))| (key, value))
480        .collect()
481}
482
483fn insert_latest_config_value(
484    latest: &mut HashMap<String, (u64, Value)>,
485    key: String,
486    id: u64,
487    value: Value,
488) {
489    match latest.get(&key) {
490        Some((prev_id, _)) if *prev_id > id => {}
491        _ => {
492            latest.insert(key, (id, value));
493        }
494    }
495}
496
497struct ConfigResolver {
498    db: Arc<RedDB>,
499    values: Option<HashMap<String, Value>>,
500}
501
502pub(crate) struct ConfigSnapshotGuard {
503    previous: Option<ConfigResolver>,
504}
505
506impl ConfigSnapshotGuard {
507    pub(super) fn install(db: Arc<RedDB>) -> Self {
508        let previous = CURRENT_CONFIG_RESOLVER
509            .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
510        Self { previous }
511    }
512}
513
514impl Drop for ConfigSnapshotGuard {
515    fn drop(&mut self) {
516        let previous = self.previous.take();
517        CURRENT_CONFIG_RESOLVER.with(|cell| {
518            cell.replace(previous);
519        });
520    }
521}
522
523/// Install the MVCC snapshot used by the current thread for the duration
524/// of one statement. Paired with `clear_current_snapshot()` — callers
525/// should prefer the `CurrentSnapshotGuard` RAII wrapper so early returns
526/// still clean up.
527pub fn set_current_snapshot(ctx: SnapshotContext) {
528    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
529    HAS_SNAPSHOT.with(|c| c.set(true));
530}
531
532pub fn clear_current_snapshot() {
533    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
534    HAS_SNAPSHOT.with(|c| c.set(false));
535}
536
537/// Drop-guard that restores the previous snapshot on scope exit. Safe to
538/// nest — each statement saves the caller's snapshot and puts it back
539/// instead of blindly clearing, so a top-level `execute_query` called
540/// from inside another statement dispatch (e.g. vector source subqueries)
541/// doesn't strip visibility from the outer scan.
542pub(crate) struct CurrentSnapshotGuard {
543    previous: Option<SnapshotContext>,
544}
545
546impl CurrentSnapshotGuard {
547    pub(crate) fn install(ctx: SnapshotContext) -> Self {
548        let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
549        set_current_snapshot(ctx);
550        Self { previous }
551    }
552}
553
554impl Drop for CurrentSnapshotGuard {
555    fn drop(&mut self) {
556        let prev = self.previous.take();
557        let has = prev.is_some();
558        CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
559        HAS_SNAPSHOT.with(|c| c.set(has));
560    }
561}
562
563/// Is this entity visible under the current thread's MVCC snapshot?
564///
565/// Returns `true` (no filtering) when no snapshot is installed — that
566/// path is used by embedded callers and by operations that intentionally
567/// bypass MVCC (VACUUM, snapshot export, admin introspection).
568///
569/// When a snapshot is installed the result is
570///   `snapshot.sees(xmin, xmax) && !mgr.is_aborted(xmin) && !xmax_half_abort`
571/// where `xmax_half_abort` re-grants visibility for tuples whose
572/// deleting transaction rolled back.
573#[inline]
574pub fn entity_visible_under_current_snapshot(
575    entity: &crate::storage::unified::entity::UnifiedEntity,
576) -> bool {
577    // Fast path — one `Cell<bool>` read, no RefCell borrow. Autocommit
578    // reads (no active MVCC transaction) still hide superseded physical
579    // versions while avoiding a full snapshot-context lookup.
580    // This runs on every row of every scan; the slow path only fires
581    // inside an explicit transaction.
582    if !HAS_SNAPSHOT.with(|c| c.get()) {
583        return entity.xmax == 0;
584    }
585    CURRENT_SNAPSHOT.with(|cell| {
586        let guard = cell.borrow();
587        let Some(ctx) = guard.as_ref() else {
588            return true;
589        };
590        visibility_check(ctx, entity.xmin, entity.xmax)
591    })
592}
593
594/// Direct visibility check from raw `(xmin, xmax)` — bypasses the
595/// entity borrow for callers that already decomposed the tuple (e.g.
596/// pre-materialized scan caches). Same semantics as
597/// `entity_visible_under_current_snapshot`.
598#[inline]
599pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
600    if !HAS_SNAPSHOT.with(|c| c.get()) {
601        return true;
602    }
603    CURRENT_SNAPSHOT.with(|cell| {
604        let guard = cell.borrow();
605        let Some(ctx) = guard.as_ref() else {
606            return true;
607        };
608        visibility_check(ctx, xmin, xmax)
609    })
610}
611
612/// Clone the current thread's snapshot context. Parallel scan paths
613/// (`query_all_zoned` with `std::thread::scope`) call this on the main
614/// thread *before* spawning workers so the captured `SnapshotContext`
615/// can be moved into every worker closure. Worker threads do not
616/// inherit thread-locals, so calling `entity_visible_under_current_snapshot`
617/// from inside a spawned closure would silently skip the filter.
618pub fn capture_current_snapshot() -> Option<SnapshotContext> {
619    CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
620}
621
622/// Whether the active read snapshot may need historical tuple versions
623/// that the current secondary indexes cannot prove. Index paths can still
624/// recheck visible candidates, but only a heap scan can discover versions
625/// whose indexed value was changed or deleted after this snapshot.
626pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
627    if !HAS_SNAPSHOT.with(|c| c.get()) {
628        return false;
629    }
630    CURRENT_SNAPSHOT.with(|cell| {
631        cell.borrow()
632            .as_ref()
633            .is_some_and(|ctx| ctx.requires_index_fallback)
634    })
635}
636
637/// Frozen MVCC + identity context for callers that need to reinstall
638/// the same view across thread-local boundaries — long-lived cursors,
639/// background batchers, anything that detaches from the dispatch path
640/// and re-enters later.
641///
642/// The bundle bakes in the three thread-locals every read path
643/// consults: `SnapshotContext` (MVCC visibility), the auth identity
644/// (RLS policy gate), and the tenant id (RLS scope). A FETCH that
645/// reinstalls the bundle sees exactly the same rows as the DECLARE
646/// would have, regardless of writes that landed in between.
647///
648/// Cheap to clone — `SnapshotContext` is a clone of three
649/// `Arc`-backed fields, identity is a `(String, Role)`, tenant is a
650/// `String`. None of these contend with the read path.
651#[derive(Clone, Default)]
652pub struct SnapshotBundle {
653    pub snapshot: Option<SnapshotContext>,
654    pub auth: Option<(String, crate::auth::Role)>,
655    pub tenant: Option<String>,
656}
657
658/// Capture the three read-path thread-locals into a `SnapshotBundle`.
659/// Pairs with `with_snapshot_bundle` for re-entry.
660pub fn snapshot_bundle() -> SnapshotBundle {
661    SnapshotBundle {
662        snapshot: capture_current_snapshot(),
663        auth: current_auth_identity(),
664        tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
665    }
666}
667
668/// Reinstall a captured `SnapshotBundle` for the duration of `f`.
669/// Restores the caller's previous thread-locals on exit (panic-safe via
670/// the explicit guard struct so a panic in `f` cannot leak the
671/// installed identity into the worker's next request).
672pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
673    struct Guard {
674        prev_snapshot: Option<SnapshotContext>,
675        prev_auth: Option<(String, crate::auth::Role)>,
676        prev_tenant: Option<String>,
677    }
678    impl Drop for Guard {
679        fn drop(&mut self) {
680            let snap = self.prev_snapshot.take();
681            let has = snap.is_some();
682            CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
683            HAS_SNAPSHOT.with(|c| c.set(has));
684            CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
685            CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
686        }
687    }
688
689    let _guard = {
690        let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
691        let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
692        let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
693
694        match bundle.snapshot.clone() {
695            Some(ctx) => set_current_snapshot(ctx),
696            None => clear_current_snapshot(),
697        }
698        CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
699        CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
700
701        Guard {
702            prev_snapshot,
703            prev_auth,
704            prev_tenant,
705        }
706    };
707    f()
708}
709
710/// Apply the same visibility rules used by the thread-local helpers
711/// against a caller-provided context. Intended for parallel workers
712/// that captured the snapshot with `capture_current_snapshot()`.
713#[inline]
714pub fn entity_visible_with_context(
715    ctx: Option<&SnapshotContext>,
716    entity: &crate::storage::unified::entity::UnifiedEntity,
717) -> bool {
718    match ctx {
719        Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
720        None => true,
721    }
722}
723
724#[inline]
725fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
726    // Writer aborted → tuple never existed from any future reader's view.
727    // Checked *before* the own-xids fast path so an aborted own-sub-xid
728    // (rolled-back savepoint) stays hidden from the parent.
729    if xmin != 0 && ctx.manager.is_aborted(xmin) {
730        return false;
731    }
732    // Deleter aborted → treat xmax as unset; fall back to xmin-only check.
733    let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
734        0
735    } else {
736        xmax
737    };
738    // Phase 2.3.2e: own-tx writes are always visible to the connection
739    // that stamped them, even when xmin/xmax exceed `snapshot.xid` (as
740    // happens for sub-xids allocated by SAVEPOINT after BEGIN).
741    let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
742    let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
743    if own_xmax {
744        // This connection deleted the row via this xid — hide it from self.
745        return false;
746    }
747    if own_xmin {
748        return true;
749    }
750    ctx.snapshot.sees(xmin, effective_xmax)
751}