Skip to main content

reddb_server/runtime/
execution_context.rs

1//! Per-thread runtime execution context.
2//!
3//! This module owns connection identity, auth/tenant scope, statement-local
4//! config/secret resolvers, and MVCC snapshot visibility helpers.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::api::{RedDBError, RedDBResult};
10use crate::storage::schema::Value;
11use crate::storage::RedDB;
12
13thread_local! {
14    /// Current connection id for the executing statement. Set by the
15    /// per-connection wrapper (stdio/gRPC handlers) before dispatching
16    /// into `execute_query`; falls back to `0` for embedded callers.
17    static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
18
19    /// Authenticated user + role for the executing statement (Phase 2.5.2
20    /// RLS enforcement). Set by the transport middleware after validating
21    /// credentials (password / cert / oauth); unset means "anonymous" /
22    /// "embedded" — RLS policies degrade to the role-agnostic subset.
23    ///
24    /// `None` skips RLS injection entirely; `Some((username, role))`
25    /// passes `role` to `matching_rls_policies(table, Some(role), action)`.
26    static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
27        const { std::cell::RefCell::new(None) };
28
29    /// MVCC snapshot scoped to the currently-executing statement (Phase
30    /// 2.3.2d PG parity). `execute_query` captures it on entry and drops
31    /// it on exit; every scan consults it via
32    /// `entity_visible_under_current_snapshot` to hide tuples whose xmin
33    /// hasn't committed or whose xmax already has.
34    ///
35    /// `None` means "pre-MVCC semantics" — the read path returns every
36    /// tuple regardless of xmin/xmax. All embedded callers that bypass
37    /// `execute_query` see this default.
38    static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
39        const { std::cell::RefCell::new(None) };
40
41    /// Cheap presence flag for `CURRENT_SNAPSHOT`. Scan hot paths
42    /// poll this instead of `borrow()`-ing the RefCell on every
43    /// row — the common case (autocommit / no MVCC session) reads
44    /// one atomic `Cell<bool>` and short-circuits, saving ~10ns × N
45    /// rows on aggregate_group / select_range scans.
46    static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
47
48    /// Session-scoped tenant id for the current connection (Phase 2.5.3
49    /// multi-tenancy). Populated by `SET TENANT 'id'` or by transport
50    /// middleware after resolving tenant from auth claims. Read by the
51    /// `CURRENT_TENANT()` scalar function — RLS policies typically
52    /// combine it as `USING (tenant_id = CURRENT_TENANT())` to scope
53    /// every query to one tenant.
54    ///
55    /// `None` means "no tenant bound" — `CURRENT_TENANT()` returns
56    /// NULL, and RLS policies that gate on it hide every row.
57    static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
58        const { std::cell::RefCell::new(None) };
59
60    /// Statement-local config resolver. SQL expressions materialize the
61    /// `red_config` snapshot lazily on the first `$config.*`/`CONFIG()`
62    /// access, keeping ordinary statements on the zero-scan path.
63    static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
64        const { std::cell::RefCell::new(None) };
65
66    /// Statement-local secret resolver. SQL expressions materialize the
67    /// vault KV snapshot lazily on first `$secret.*` access, then use
68    /// lock-free map reads for the rest of the statement.
69    static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
70        const { std::cell::RefCell::new(None) };
71}
72
73/// Snapshot + manager pair used for read-path visibility checks.
74///
75/// The manager is needed in addition to the snapshot because `aborted`
76/// state mutates after the snapshot is captured — a ROLLBACK by a
77/// committed-at-capture-time writer must still hide its tuples. Keeping
78/// the Arc around is O(pointer) and the RwLock reads on `is_aborted`
79/// are cheap (HashSet lookup under a parking_lot read guard).
80///
81/// `own_xids` (Phase 2.3.2e) lists the xids belonging to the current
82/// connection's transaction — the parent xid plus open and released
83/// savepoint sub-xids. The visibility rule promotes rows stamped with
84/// these xids to "always visible (unless aborted)" so the writer sees
85/// its own nested-savepoint writes even though their xids exceed
86/// `snapshot.xid`.
87#[derive(Clone)]
88pub struct SnapshotContext {
89    pub snapshot: crate::storage::transaction::snapshot::Snapshot,
90    pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
91    pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
92    pub requires_index_fallback: bool,
93}
94
95/// Install a connection id on the current thread for the duration of a
96/// statement. Transaction state (`RuntimeInner::tx_contexts`) is keyed
97/// by this id so different connections can hold independent BEGINs.
98///
99/// Pub so transports (PG wire, gRPC, HTTP per-request spawners) and
100/// tests can emulate per-connection isolation. Call it once when
101/// binding the connection's worker thread; pair with
102/// `clear_current_connection_id` on teardown.
103pub fn set_current_connection_id(id: u64) {
104    CURRENT_CONN_ID.with(|c| c.set(id));
105}
106
107/// Reset the thread's connection id back to `0` (autocommit).
108pub fn clear_current_connection_id() {
109    CURRENT_CONN_ID.with(|c| c.set(0));
110}
111
112/// Read the connection id set by `set_current_connection_id`. Returns
113/// `0` when no wrapper installed one — auto-commit path.
114pub fn current_connection_id() -> u64 {
115    CURRENT_CONN_ID.with(|c| c.get())
116}
117
118/// Install the authenticated identity for the current thread (Phase 2.5.2
119/// RLS enforcement). Transport layers call this right after resolving
120/// auth so the query dispatch can fold RLS policies into the filter.
121pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
122    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
123}
124
125/// Clear the thread-local auth identity. Transports call this after the
126/// statement completes so pooled threads don't leak identities across
127/// requests.
128pub fn clear_current_auth_identity() {
129    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
130}
131
132/// Read the current-thread auth identity. `None` when no transport
133/// installed one (embedded mode / anonymous access).
134pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
135    CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
136}
137
138/// Public probe of the thread-local auth identity for callers outside
139/// the `runtime` module (e.g. the AI credential resolver, which audits
140/// who triggered a secret read on behalf of a query).
141pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
142    current_auth_identity()
143}
144
145/// Install the session tenant id for the current thread (Phase 2.5.3
146/// multi-tenancy). Called by `SET TENANT 'id'` dispatch and by
147/// transport middleware that resolves tenant from auth claims (e.g.
148/// JWT `tenant` claim, HTTP header, subdomain).
149pub fn set_current_tenant(tenant_id: String) {
150    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
151}
152
153/// Clear the current-thread tenant — `CURRENT_TENANT()` will then
154/// return NULL and any RLS policy gated on it will hide every row.
155pub fn clear_current_tenant() {
156    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
157}
158
159/// Read the current-thread tenant id, applying overrides in priority order:
160///   1. `WITHIN TENANT '<id>' …` per-statement override (highest)
161///   2. `SET LOCAL TENANT '<id>'` transaction-local override (consulted
162///      only when the current connection has an open transaction)
163///   3. `SET TENANT '<id>'` session-level thread-local
164///   4. `None` (deny-default for RLS).
165///
166/// The transaction-local layer is read through the runtime; an embedded
167/// helper crate that has no `RedDBRuntime` access still gets correct
168/// behaviour for layers 1, 3, and 4.
169pub fn current_tenant() -> Option<String> {
170    let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
171    if let Some(over) = current_scope_override() {
172        if over.tenant.is_active() {
173            return over.tenant.resolve(inherited);
174        }
175    }
176    if let Some(tx_local) = current_tx_local_tenant() {
177        return tx_local;
178    }
179    inherited
180}
181
182thread_local! {
183    /// Snapshot of the active connection's `tx_local_tenants` entry for
184    /// the current `execute_query` call. Outer `Some(_)` means "a
185    /// transaction-local tenant override is active for this call";
186    /// inner is the override's value (`Some(s)` overrides to `s`,
187    /// `None` overrides to NULL/cleared). Refreshed at the top of every
188    /// `execute_query` invocation and cleared by the RAII guard on
189    /// return so pooled connections cannot leak the override past the
190    /// statement that owns it.
191    static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
192        const { std::cell::RefCell::new(None) };
193}
194
195fn current_tx_local_tenant() -> Option<Option<String>> {
196    TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
197}
198
199/// Recognise `SET LOCAL TENANT '<id>'` / `SET LOCAL TENANT NULL` —
200/// returns `Ok(Some(Some(id)))` for an explicit value, `Ok(Some(None))`
201/// for an explicit NULL clear, `Ok(None)` when the input is not a
202/// `SET LOCAL TENANT` statement at all, and `Err` when the prefix
203/// matches but the value is malformed.
204pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
205    let mut tokens = query.split_ascii_whitespace();
206    let Some(w1) = tokens.next() else {
207        return Ok(None);
208    };
209    if !w1.eq_ignore_ascii_case("SET") {
210        return Ok(None);
211    }
212    let Some(w2) = tokens.next() else {
213        return Ok(None);
214    };
215    if !w2.eq_ignore_ascii_case("LOCAL") {
216        return Ok(None);
217    }
218    let Some(w3) = tokens.next() else {
219        return Ok(None);
220    };
221    if !w3.eq_ignore_ascii_case("TENANT") {
222        return Ok(None);
223    }
224    let rest: String = tokens.collect::<Vec<_>>().join(" ");
225    let rest = rest.trim().trim_end_matches(';').trim();
226    let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
227    if value_str.is_empty() {
228        return Err(RedDBError::Query(
229            "SET LOCAL TENANT expects a string literal or NULL".to_string(),
230        ));
231    }
232    if value_str.eq_ignore_ascii_case("NULL") {
233        return Ok(Some(None));
234    }
235    if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
236        let inner = &value_str[1..value_str.len() - 1];
237        return Ok(Some(Some(inner.to_string())));
238    }
239    Err(RedDBError::Query(format!(
240        "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
241    )))
242}
243
244pub(crate) struct TxLocalTenantGuard;
245
246impl TxLocalTenantGuard {
247    pub fn install(value: Option<Option<String>>) -> Self {
248        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
249        Self
250    }
251}
252
253impl Drop for TxLocalTenantGuard {
254    fn drop(&mut self) {
255        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
256    }
257}
258
259thread_local! {
260    /// Stack of `WITHIN ... <stmt>` overrides active on the current
261    /// thread. Every entry corresponds to one in-flight `execute_query`
262    /// call that started with a `WITHIN` prefix; the entry is pushed
263    /// before dispatch and popped before the call returns. The stack
264    /// shape supports nested invocations (e.g. a view body that itself
265    /// re-enters execute_query).
266    static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
267        const { std::cell::RefCell::new(Vec::new()) };
268}
269
270pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
271    SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
272}
273
274pub(crate) fn pop_scope_override() {
275    SCOPE_OVERRIDES.with(|cell| {
276        cell.borrow_mut().pop();
277    });
278}
279
280pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
281    SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
282}
283
284/// Cheap probe: is any `WITHIN …` scope override active on this
285/// thread? The fast-path needs to know without paying for the full
286/// `.last().cloned()` allocation — just peek at stack length.
287pub(crate) fn has_scope_override_active() -> bool {
288    SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
289}
290
291/// RAII guard pairing `push_scope_override` with the matching pop, so
292/// the stack stays balanced even when the inner `execute_query` returns
293/// early via `?`.
294pub(crate) struct ScopeOverrideGuard;
295
296impl ScopeOverrideGuard {
297    pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
298        push_scope_override(over);
299        Self
300    }
301}
302
303impl Drop for ScopeOverrideGuard {
304    fn drop(&mut self) {
305        pop_scope_override();
306    }
307}
308
309/// Read the current-thread auth identity, honouring per-statement
310/// `WITHIN ... USER '<u>' AS ROLE '<r>'` overrides. The override only
311/// supplies projected strings — it never grants additional privilege —
312/// so callers that need to make authorisation decisions must read from
313/// the underlying `current_auth_identity()` directly.
314pub(crate) fn current_user_projected() -> Option<String> {
315    let inherited = current_auth_identity().map(|(u, _)| u);
316    if let Some(over) = current_scope_override() {
317        if over.user.is_active() {
318            return over.user.resolve(inherited);
319        }
320    }
321    inherited
322}
323
324pub(crate) fn current_role_projected() -> Option<String> {
325    let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
326    if let Some(over) = current_scope_override() {
327        if over.role.is_active() {
328            return over.role.resolve(inherited);
329        }
330    }
331    inherited
332}
333
334pub(crate) fn current_secret_value(path: &str) -> Option<String> {
335    let key = path.to_ascii_lowercase();
336    CURRENT_SECRET_RESOLVER.with(|cell| {
337        let mut resolver = cell.borrow_mut();
338        let resolver = resolver.as_mut()?;
339        if resolver.values.is_none() {
340            resolver.values = resolver
341                .store
342                .as_ref()
343                .map(|store| store.vault_kv_snapshot());
344        }
345        let values = resolver.values.as_ref()?;
346        values.get(&key).cloned().or_else(|| {
347            key.strip_prefix("red.vault/").and_then(|rest| {
348                values
349                    .get(rest)
350                    .cloned()
351                    .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
352            })
353        })
354    })
355}
356
357struct SecretResolver {
358    store: Option<Arc<crate::auth::store::AuthStore>>,
359    values: Option<HashMap<String, String>>,
360}
361
362pub(crate) struct SecretStoreGuard {
363    previous: Option<SecretResolver>,
364}
365
366impl SecretStoreGuard {
367    pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
368        let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
369            cell.replace(Some(SecretResolver {
370                store,
371                values: None,
372            }))
373        });
374        Self { previous }
375    }
376}
377
378impl Drop for SecretStoreGuard {
379    fn drop(&mut self) {
380        let previous = self.previous.take();
381        CURRENT_SECRET_RESOLVER.with(|cell| {
382            cell.replace(previous);
383        });
384    }
385}
386
387pub(crate) fn current_config_value(path: &str) -> Option<Value> {
388    let key = path.to_ascii_lowercase();
389    CURRENT_CONFIG_RESOLVER.with(|cell| {
390        let mut resolver = cell.borrow_mut();
391        let resolver = resolver.as_mut()?;
392        if resolver.values.is_none() {
393            resolver.values = Some(latest_config_snapshot(&resolver.db));
394        }
395        let values = resolver.values.as_ref()?;
396        // #1370 — `$config.<path>` desugars to `CONFIG("red.config/<path>")`,
397        // but `SET CONFIG <path>` stores under the bare key. Mirror the secret
398        // resolver: after the namespaced key, fall back to the stripped bare
399        // key, then the dotted `red.config.<rest>` legacy form.
400        values.get(&key).cloned().or_else(|| {
401            key.strip_prefix("red.config/").and_then(|rest| {
402                values
403                    .get(rest)
404                    .cloned()
405                    .or_else(|| values.get(&format!("red.config.{rest}")).cloned())
406            })
407        })
408    })
409}
410
411pub(crate) fn update_current_config_value(path: &str, value: Value) {
412    let key = path.to_ascii_lowercase();
413    CURRENT_CONFIG_RESOLVER.with(|cell| {
414        if let Some(resolver) = cell.borrow_mut().as_mut() {
415            if let Some(values) = resolver.values.as_mut() {
416                values.insert(key, value);
417            }
418        }
419    });
420}
421
422pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
423    let key = path.to_ascii_lowercase();
424    CURRENT_SECRET_RESOLVER.with(|cell| {
425        if let Some(resolver) = cell.borrow_mut().as_mut() {
426            let Some(values) = resolver.values.as_mut() else {
427                return;
428            };
429            match value {
430                Some(value) => {
431                    values.insert(key, value);
432                }
433                None => {
434                    values.remove(&key);
435                }
436            }
437        }
438    });
439}
440
441fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
442    let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
443
444    if let Some(manager) = db.store().get_collection("red_config") {
445        manager.for_each_entity(|entity| {
446            let Some(row) = entity.data.as_row() else {
447                return true;
448            };
449            let Some(Value::Text(key)) = row.get_field("key") else {
450                return true;
451            };
452            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
453            let id = entity.id.raw();
454            let key = key.to_ascii_lowercase();
455            insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
456            if let Some(rest) = key.strip_prefix("red.config.") {
457                insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
458            }
459            true
460        });
461    }
462
463    if let Some(manager) = db.store().get_collection("red.config") {
464        manager.for_each_entity(|entity| {
465            let Some(row) = entity.data.as_row() else {
466                return true;
467            };
468            if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
469                return true;
470            }
471            let Some(Value::Text(key)) = row.get_field("key") else {
472                return true;
473            };
474            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
475            insert_latest_config_value(
476                &mut latest,
477                format!("red.config/{}", key.to_ascii_lowercase()),
478                entity.id.raw(),
479                value,
480            );
481            true
482        });
483    }
484
485    latest
486        .into_iter()
487        .map(|(key, (_, value))| (key, value))
488        .collect()
489}
490
491fn insert_latest_config_value(
492    latest: &mut HashMap<String, (u64, Value)>,
493    key: String,
494    id: u64,
495    value: Value,
496) {
497    match latest.get(&key) {
498        Some((prev_id, _)) if *prev_id > id => {}
499        _ => {
500            latest.insert(key, (id, value));
501        }
502    }
503}
504
505struct ConfigResolver {
506    db: Arc<RedDB>,
507    values: Option<HashMap<String, Value>>,
508}
509
510pub(crate) struct ConfigSnapshotGuard {
511    previous: Option<ConfigResolver>,
512}
513
514impl ConfigSnapshotGuard {
515    pub(super) fn install(db: Arc<RedDB>) -> Self {
516        let previous = CURRENT_CONFIG_RESOLVER
517            .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
518        Self { previous }
519    }
520}
521
522impl Drop for ConfigSnapshotGuard {
523    fn drop(&mut self) {
524        let previous = self.previous.take();
525        CURRENT_CONFIG_RESOLVER.with(|cell| {
526            cell.replace(previous);
527        });
528    }
529}
530
531/// Install the MVCC snapshot used by the current thread for the duration
532/// of one statement. Paired with `clear_current_snapshot()` — callers
533/// should prefer the `CurrentSnapshotGuard` RAII wrapper so early returns
534/// still clean up.
535pub fn set_current_snapshot(ctx: SnapshotContext) {
536    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
537    HAS_SNAPSHOT.with(|c| c.set(true));
538}
539
540pub fn clear_current_snapshot() {
541    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
542    HAS_SNAPSHOT.with(|c| c.set(false));
543}
544
545/// Drop-guard that restores the previous snapshot on scope exit. Safe to
546/// nest — each statement saves the caller's snapshot and puts it back
547/// instead of blindly clearing, so a top-level `execute_query` called
548/// from inside another statement dispatch (e.g. vector source subqueries)
549/// doesn't strip visibility from the outer scan.
550pub(crate) struct CurrentSnapshotGuard {
551    previous: Option<SnapshotContext>,
552}
553
554impl CurrentSnapshotGuard {
555    pub(crate) fn install(ctx: SnapshotContext) -> Self {
556        let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
557        set_current_snapshot(ctx);
558        Self { previous }
559    }
560}
561
562impl Drop for CurrentSnapshotGuard {
563    fn drop(&mut self) {
564        let prev = self.previous.take();
565        let has = prev.is_some();
566        CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
567        HAS_SNAPSHOT.with(|c| c.set(has));
568    }
569}
570
571/// Is this entity visible under the current thread's MVCC snapshot?
572///
573/// Returns `true` (no filtering) when no snapshot is installed — that
574/// path is used by embedded callers and by operations that intentionally
575/// bypass MVCC (VACUUM, snapshot export, admin introspection).
576///
577/// When a snapshot is installed the result is
578///   `snapshot.sees(xmin, xmax) && !mgr.is_aborted(xmin) && !xmax_half_abort`
579/// where `xmax_half_abort` re-grants visibility for tuples whose
580/// deleting transaction rolled back.
581#[inline]
582pub fn entity_visible_under_current_snapshot(
583    entity: &crate::storage::unified::entity::UnifiedEntity,
584) -> bool {
585    // Moderation visibility gate (#1274, ADR 0057). A row carrying the
586    // moderation status marker — quarantine-pending or rejected-tombstone
587    // — is hidden from every normal read, on top of MVCC visibility. The
588    // marker lives on the row itself, so the check is a single field probe
589    // and rides the existing per-row visibility chokepoint rather than
590    // adding a separate filter pass to each scan call-site.
591    if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
592        return false;
593    }
594    // Fast path — one `Cell<bool>` read, no RefCell borrow. Autocommit
595    // reads (no active MVCC transaction) still hide superseded physical
596    // versions while avoiding a full snapshot-context lookup.
597    // This runs on every row of every scan; the slow path only fires
598    // inside an explicit transaction.
599    if !HAS_SNAPSHOT.with(|c| c.get()) {
600        return entity.xmax == 0;
601    }
602    CURRENT_SNAPSHOT.with(|cell| {
603        let guard = cell.borrow();
604        let Some(ctx) = guard.as_ref() else {
605            return true;
606        };
607        visibility_check(ctx, entity.xmin, entity.xmax)
608    })
609}
610
611/// Direct visibility check from raw `(xmin, xmax)` — bypasses the
612/// entity borrow for callers that already decomposed the tuple (e.g.
613/// pre-materialized scan caches). Same semantics as
614/// `entity_visible_under_current_snapshot`.
615#[inline]
616pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
617    if !HAS_SNAPSHOT.with(|c| c.get()) {
618        return true;
619    }
620    CURRENT_SNAPSHOT.with(|cell| {
621        let guard = cell.borrow();
622        let Some(ctx) = guard.as_ref() else {
623            return true;
624        };
625        visibility_check(ctx, xmin, xmax)
626    })
627}
628
629/// Clone the current thread's snapshot context. Parallel scan paths
630/// (`query_all_zoned` with `std::thread::scope`) call this on the main
631/// thread *before* spawning workers so the captured `SnapshotContext`
632/// can be moved into every worker closure. Worker threads do not
633/// inherit thread-locals, so calling `entity_visible_under_current_snapshot`
634/// from inside a spawned closure would silently skip the filter.
635pub fn capture_current_snapshot() -> Option<SnapshotContext> {
636    CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
637}
638
639/// Whether the active read snapshot may need historical tuple versions
640/// that the current secondary indexes cannot prove. Index paths can still
641/// recheck visible candidates, but only a heap scan can discover versions
642/// whose indexed value was changed or deleted after this snapshot.
643pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
644    if !HAS_SNAPSHOT.with(|c| c.get()) {
645        return false;
646    }
647    CURRENT_SNAPSHOT.with(|cell| {
648        cell.borrow()
649            .as_ref()
650            .is_some_and(|ctx| ctx.requires_index_fallback)
651    })
652}
653
654/// Frozen MVCC + identity context for callers that need to reinstall
655/// the same view across thread-local boundaries — long-lived cursors,
656/// background batchers, anything that detaches from the dispatch path
657/// and re-enters later.
658///
659/// The bundle bakes in the three thread-locals every read path
660/// consults: `SnapshotContext` (MVCC visibility), the auth identity
661/// (RLS policy gate), and the tenant id (RLS scope). A FETCH that
662/// reinstalls the bundle sees exactly the same rows as the DECLARE
663/// would have, regardless of writes that landed in between.
664///
665/// Cheap to clone — `SnapshotContext` is a clone of three
666/// `Arc`-backed fields, identity is a `(String, Role)`, tenant is a
667/// `String`. None of these contend with the read path.
668#[derive(Clone, Default)]
669pub struct SnapshotBundle {
670    pub snapshot: Option<SnapshotContext>,
671    pub auth: Option<(String, crate::auth::Role)>,
672    pub tenant: Option<String>,
673}
674
675/// Capture the three read-path thread-locals into a `SnapshotBundle`.
676/// Pairs with `with_snapshot_bundle` for re-entry.
677pub fn snapshot_bundle() -> SnapshotBundle {
678    SnapshotBundle {
679        snapshot: capture_current_snapshot(),
680        auth: current_auth_identity(),
681        tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
682    }
683}
684
685/// Reinstall a captured `SnapshotBundle` for the duration of `f`.
686/// Restores the caller's previous thread-locals on exit (panic-safe via
687/// the explicit guard struct so a panic in `f` cannot leak the
688/// installed identity into the worker's next request).
689pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
690    struct Guard {
691        prev_snapshot: Option<SnapshotContext>,
692        prev_auth: Option<(String, crate::auth::Role)>,
693        prev_tenant: Option<String>,
694    }
695    impl Drop for Guard {
696        fn drop(&mut self) {
697            let snap = self.prev_snapshot.take();
698            let has = snap.is_some();
699            CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
700            HAS_SNAPSHOT.with(|c| c.set(has));
701            CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
702            CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
703        }
704    }
705
706    let _guard = {
707        let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
708        let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
709        let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
710
711        match bundle.snapshot.clone() {
712            Some(ctx) => set_current_snapshot(ctx),
713            None => clear_current_snapshot(),
714        }
715        CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
716        CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
717
718        Guard {
719            prev_snapshot,
720            prev_auth,
721            prev_tenant,
722        }
723    };
724    f()
725}
726
727/// Apply the same visibility rules used by the thread-local helpers
728/// against a caller-provided context. Intended for parallel workers
729/// that captured the snapshot with `capture_current_snapshot()`.
730#[inline]
731pub fn entity_visible_with_context(
732    ctx: Option<&SnapshotContext>,
733    entity: &crate::storage::unified::entity::UnifiedEntity,
734) -> bool {
735    // Same moderation visibility gate as the thread-local path (#1274):
736    // parallel scan workers capture the snapshot context but must apply
737    // the moderation marker check identically.
738    if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
739        return false;
740    }
741    match ctx {
742        Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
743        None => true,
744    }
745}
746
747#[inline]
748fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
749    // Writer aborted → tuple never existed from any future reader's view.
750    // Checked *before* the own-xids fast path so an aborted own-sub-xid
751    // (rolled-back savepoint) stays hidden from the parent.
752    if xmin != 0 && ctx.manager.is_aborted(xmin) {
753        return false;
754    }
755    // Deleter aborted → treat xmax as unset; fall back to xmin-only check.
756    let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
757        0
758    } else {
759        xmax
760    };
761    // Phase 2.3.2e: own-tx writes are always visible to the connection
762    // that stamped them, even when xmin/xmax exceed `snapshot.xid` (as
763    // happens for sub-xids allocated by SAVEPOINT after BEGIN).
764    let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
765    let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
766    if own_xmax {
767        // This connection deleted the row via this xid — hide it from self.
768        return false;
769    }
770    if own_xmin {
771        return true;
772    }
773    ctx.snapshot.sees(xmin, effective_xmax)
774}