Skip to main content

reddb_server/runtime/
execution_context.rs

1//! Per-thread runtime execution context.
2//!
3//! This module owns connection identity, auth/tenant scope, statement-local
4//! config/secret resolvers, and MVCC snapshot visibility helpers.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::api::{RedDBError, RedDBResult};
10use crate::storage::schema::Value;
11use crate::storage::RedDB;
12
13thread_local! {
14    /// Current connection id for the executing statement. Set by the
15    /// per-connection wrapper (stdio/gRPC handlers) before dispatching
16    /// into `execute_query`; falls back to `0` for embedded callers.
17    static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
18
19    /// Authenticated user + role for the executing statement (Phase 2.5.2
20    /// RLS enforcement). Set by the transport middleware after validating
21    /// credentials (password / cert / oauth); unset means "anonymous" /
22    /// "embedded" — RLS policies degrade to the role-agnostic subset.
23    ///
24    /// `None` skips RLS injection entirely; `Some((username, role))`
25    /// passes `role` to `matching_rls_policies(table, Some(role), action)`.
26    static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
27        const { std::cell::RefCell::new(None) };
28
29    /// MVCC snapshot scoped to the currently-executing statement (Phase
30    /// 2.3.2d PG parity). `execute_query` captures it on entry and drops
31    /// it on exit; every scan consults it via
32    /// `entity_visible_under_current_snapshot` to hide tuples whose xmin
33    /// hasn't committed or whose xmax already has.
34    ///
35    /// `None` means "pre-MVCC semantics" — the read path returns every
36    /// tuple regardless of xmin/xmax. All embedded callers that bypass
37    /// `execute_query` see this default.
38    static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
39        const { std::cell::RefCell::new(None) };
40
41    /// Cheap presence flag for `CURRENT_SNAPSHOT`. Scan hot paths
42    /// poll this instead of `borrow()`-ing the RefCell on every
43    /// row — the common case (autocommit / no MVCC session) reads
44    /// one atomic `Cell<bool>` and short-circuits, saving ~10ns × N
45    /// rows on aggregate_group / select_range scans.
46    static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
47
48    /// Session-scoped tenant id for the current connection (Phase 2.5.3
49    /// multi-tenancy). Populated by `SET TENANT 'id'` or by transport
50    /// middleware after resolving tenant from auth claims. Read by the
51    /// `CURRENT_TENANT()` scalar function — RLS policies typically
52    /// combine it as `USING (tenant_id = CURRENT_TENANT())` to scope
53    /// every query to one tenant.
54    ///
55    /// `None` means "no tenant bound" — `CURRENT_TENANT()` returns
56    /// NULL, and RLS policies that gate on it hide every row.
57    static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
58        const { std::cell::RefCell::new(None) };
59
60    /// Statement-local config resolver. SQL expressions materialize the
61    /// `red_config` snapshot lazily on the first `$config.*`/`CONFIG()`
62    /// access, keeping ordinary statements on the zero-scan path.
63    static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
64        const { std::cell::RefCell::new(None) };
65
66    /// Statement-local secret resolver. SQL expressions materialize the
67    /// vault KV snapshot lazily on first `$secret.*` access, then use
68    /// lock-free map reads for the rest of the statement.
69    static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
70        const { std::cell::RefCell::new(None) };
71}
72
73/// Snapshot + manager pair used for read-path visibility checks.
74///
75/// The manager is needed in addition to the snapshot because `aborted`
76/// state mutates after the snapshot is captured — a ROLLBACK by a
77/// committed-at-capture-time writer must still hide its tuples. Keeping
78/// the Arc around is O(pointer) and the RwLock reads on `is_aborted`
79/// are cheap (HashSet lookup under a parking_lot read guard).
80///
81/// `own_xids` (Phase 2.3.2e) lists the xids belonging to the current
82/// connection's transaction — the parent xid plus open and released
83/// savepoint sub-xids. The visibility rule promotes rows stamped with
84/// these xids to "always visible (unless aborted)" so the writer sees
85/// its own nested-savepoint writes even though their xids exceed
86/// `snapshot.xid`.
87#[derive(Clone)]
88pub struct SnapshotContext {
89    pub snapshot: crate::storage::transaction::snapshot::Snapshot,
90    pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
91    pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
92    pub requires_index_fallback: bool,
93}
94
95/// Install a connection id on the current thread for the duration of a
96/// statement. Transaction state (`RuntimeInner::tx_contexts`) is keyed
97/// by this id so different connections can hold independent BEGINs.
98///
99/// Pub so transports (PG wire, gRPC, HTTP per-request spawners) and
100/// tests can emulate per-connection isolation. Call it once when
101/// binding the connection's worker thread; pair with
102/// `clear_current_connection_id` on teardown.
103pub fn set_current_connection_id(id: u64) {
104    CURRENT_CONN_ID.with(|c| c.set(id));
105}
106
107/// Reset the thread's connection id back to `0` (autocommit).
108pub fn clear_current_connection_id() {
109    CURRENT_CONN_ID.with(|c| c.set(0));
110}
111
112/// Read the connection id set by `set_current_connection_id`. Returns
113/// `0` when no wrapper installed one — auto-commit path.
114pub fn current_connection_id() -> u64 {
115    CURRENT_CONN_ID.with(|c| c.get())
116}
117
118/// Install the authenticated identity for the current thread (Phase 2.5.2
119/// RLS enforcement). Transport layers call this right after resolving
120/// auth so the query dispatch can fold RLS policies into the filter.
121pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
122    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
123}
124
125/// Clear the thread-local auth identity. Transports call this after the
126/// statement completes so pooled threads don't leak identities across
127/// requests.
128pub fn clear_current_auth_identity() {
129    CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
130}
131
132/// Read the current-thread auth identity. `None` when no transport
133/// installed one (embedded mode / anonymous access).
134pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
135    CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
136}
137
138/// Public probe of the thread-local auth identity for callers outside
139/// the `runtime` module (e.g. the AI credential resolver, which audits
140/// who triggered a secret read on behalf of a query).
141pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
142    current_auth_identity()
143}
144
145/// Install the session tenant id for the current thread (Phase 2.5.3
146/// multi-tenancy). Called by `SET TENANT 'id'` dispatch and by
147/// transport middleware that resolves tenant from auth claims (e.g.
148/// JWT `tenant` claim, HTTP header, subdomain).
149pub fn set_current_tenant(tenant_id: String) {
150    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
151}
152
153/// Clear the current-thread tenant — `CURRENT_TENANT()` will then
154/// return NULL and any RLS policy gated on it will hide every row.
155pub fn clear_current_tenant() {
156    CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
157}
158
159/// Read the current-thread tenant id, applying overrides in priority order:
160///   1. `WITHIN TENANT '<id>' …` per-statement override (highest)
161///   2. `SET LOCAL TENANT '<id>'` transaction-local override (consulted
162///      only when the current connection has an open transaction)
163///   3. `SET TENANT '<id>'` session-level thread-local
164///   4. `None` (deny-default for RLS).
165///
166/// The transaction-local layer is read through the runtime; an embedded
167/// helper crate that has no `RedDBRuntime` access still gets correct
168/// behaviour for layers 1, 3, and 4.
169pub fn current_tenant() -> Option<String> {
170    let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
171    if let Some(over) = current_scope_override() {
172        if over.tenant.is_active() {
173            return over.tenant.resolve(inherited);
174        }
175    }
176    if let Some(tx_local) = current_tx_local_tenant() {
177        return tx_local;
178    }
179    inherited
180}
181
182thread_local! {
183    /// Snapshot of the active connection's `tx_local_tenants` entry for
184    /// the current `execute_query` call. Outer `Some(_)` means "a
185    /// transaction-local tenant override is active for this call";
186    /// inner is the override's value (`Some(s)` overrides to `s`,
187    /// `None` overrides to NULL/cleared). Refreshed at the top of every
188    /// `execute_query` invocation and cleared by the RAII guard on
189    /// return so pooled connections cannot leak the override past the
190    /// statement that owns it.
191    static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
192        const { std::cell::RefCell::new(None) };
193}
194
195fn current_tx_local_tenant() -> Option<Option<String>> {
196    TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
197}
198
199/// Recognise `SET LOCAL TENANT '<id>'` / `SET LOCAL TENANT NULL` —
200/// returns `Ok(Some(Some(id)))` for an explicit value, `Ok(Some(None))`
201/// for an explicit NULL clear, `Ok(None)` when the input is not a
202/// `SET LOCAL TENANT` statement at all, and `Err` when the prefix
203/// matches but the value is malformed.
204pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
205    let mut tokens = query.split_ascii_whitespace();
206    let Some(w1) = tokens.next() else {
207        return Ok(None);
208    };
209    if !w1.eq_ignore_ascii_case("SET") {
210        return Ok(None);
211    }
212    let Some(w2) = tokens.next() else {
213        return Ok(None);
214    };
215    if !w2.eq_ignore_ascii_case("LOCAL") {
216        return Ok(None);
217    }
218    let Some(w3) = tokens.next() else {
219        return Ok(None);
220    };
221    if !w3.eq_ignore_ascii_case("TENANT") {
222        return Ok(None);
223    }
224    let rest: String = tokens.collect::<Vec<_>>().join(" ");
225    let rest = rest.trim().trim_end_matches(';').trim();
226    let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
227    if value_str.is_empty() {
228        return Err(RedDBError::Query(
229            "SET LOCAL TENANT expects a string literal or NULL".to_string(),
230        ));
231    }
232    if value_str.eq_ignore_ascii_case("NULL") {
233        return Ok(Some(None));
234    }
235    if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
236        let inner = &value_str[1..value_str.len() - 1];
237        return Ok(Some(Some(inner.to_string())));
238    }
239    Err(RedDBError::Query(format!(
240        "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
241    )))
242}
243
244pub(crate) struct TxLocalTenantGuard;
245
246impl TxLocalTenantGuard {
247    pub fn install(value: Option<Option<String>>) -> Self {
248        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
249        Self
250    }
251}
252
253impl Drop for TxLocalTenantGuard {
254    fn drop(&mut self) {
255        TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
256    }
257}
258
259thread_local! {
260    /// Stack of `WITHIN ... <stmt>` overrides active on the current
261    /// thread. Every entry corresponds to one in-flight `execute_query`
262    /// call that started with a `WITHIN` prefix; the entry is pushed
263    /// before dispatch and popped before the call returns. The stack
264    /// shape supports nested invocations (e.g. a view body that itself
265    /// re-enters execute_query).
266    static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
267        const { std::cell::RefCell::new(Vec::new()) };
268}
269
270pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
271    SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
272}
273
274pub(crate) fn pop_scope_override() {
275    SCOPE_OVERRIDES.with(|cell| {
276        cell.borrow_mut().pop();
277    });
278}
279
280pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
281    SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
282}
283
284/// Cheap probe: is any `WITHIN …` scope override active on this
285/// thread? The fast-path needs to know without paying for the full
286/// `.last().cloned()` allocation — just peek at stack length.
287pub(crate) fn has_scope_override_active() -> bool {
288    SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
289}
290
291/// RAII guard pairing `push_scope_override` with the matching pop, so
292/// the stack stays balanced even when the inner `execute_query` returns
293/// early via `?`.
294pub(crate) struct ScopeOverrideGuard;
295
296impl ScopeOverrideGuard {
297    pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
298        push_scope_override(over);
299        Self
300    }
301}
302
303impl Drop for ScopeOverrideGuard {
304    fn drop(&mut self) {
305        pop_scope_override();
306    }
307}
308
309/// Read the current-thread auth identity, honouring per-statement
310/// `WITHIN ... USER '<u>' AS ROLE '<r>'` overrides. The override only
311/// supplies projected strings — it never grants additional privilege —
312/// so callers that need to make authorisation decisions must read from
313/// the underlying `current_auth_identity()` directly.
314pub(crate) fn current_user_projected() -> Option<String> {
315    let inherited = current_auth_identity().map(|(u, _)| u);
316    if let Some(over) = current_scope_override() {
317        if over.user.is_active() {
318            return over.user.resolve(inherited);
319        }
320    }
321    inherited
322}
323
324pub(crate) fn current_role_projected() -> Option<String> {
325    let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
326    if let Some(over) = current_scope_override() {
327        if over.role.is_active() {
328            return over.role.resolve(inherited);
329        }
330    }
331    inherited
332}
333
334pub(crate) fn current_secret_value(path: &str) -> Option<String> {
335    let key = path.to_ascii_lowercase();
336    CURRENT_SECRET_RESOLVER.with(|cell| {
337        let mut resolver = cell.borrow_mut();
338        let resolver = resolver.as_mut()?;
339        if resolver.values.is_none() {
340            resolver.values = resolver
341                .store
342                .as_ref()
343                .map(|store| store.vault_kv_snapshot());
344        }
345        let values = resolver.values.as_ref()?;
346        values.get(&key).cloned().or_else(|| {
347            key.strip_prefix("red.vault/").and_then(|rest| {
348                values
349                    .get(rest)
350                    .cloned()
351                    .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
352            })
353        })
354    })
355}
356
357struct SecretResolver {
358    store: Option<Arc<crate::auth::store::AuthStore>>,
359    values: Option<HashMap<String, String>>,
360}
361
362pub(crate) struct SecretStoreGuard {
363    previous: Option<SecretResolver>,
364}
365
366impl SecretStoreGuard {
367    pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
368        let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
369            cell.replace(Some(SecretResolver {
370                store,
371                values: None,
372            }))
373        });
374        Self { previous }
375    }
376}
377
378impl Drop for SecretStoreGuard {
379    fn drop(&mut self) {
380        let previous = self.previous.take();
381        CURRENT_SECRET_RESOLVER.with(|cell| {
382            cell.replace(previous);
383        });
384    }
385}
386
387pub(crate) fn current_config_value(path: &str) -> Option<Value> {
388    let key = path.to_ascii_lowercase();
389    CURRENT_CONFIG_RESOLVER.with(|cell| {
390        let mut resolver = cell.borrow_mut();
391        let resolver = resolver.as_mut()?;
392        if resolver.values.is_none() {
393            resolver.values = Some(latest_config_snapshot(&resolver.db));
394        }
395        let values = resolver.values.as_ref()?;
396        values.get(&key).cloned().or_else(|| {
397            key.strip_prefix("red.config/")
398                .and_then(|rest| values.get(&format!("red.config.{rest}")).cloned())
399        })
400    })
401}
402
403pub(crate) fn update_current_config_value(path: &str, value: Value) {
404    let key = path.to_ascii_lowercase();
405    CURRENT_CONFIG_RESOLVER.with(|cell| {
406        if let Some(resolver) = cell.borrow_mut().as_mut() {
407            if let Some(values) = resolver.values.as_mut() {
408                values.insert(key, value);
409            }
410        }
411    });
412}
413
414pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
415    let key = path.to_ascii_lowercase();
416    CURRENT_SECRET_RESOLVER.with(|cell| {
417        if let Some(resolver) = cell.borrow_mut().as_mut() {
418            let Some(values) = resolver.values.as_mut() else {
419                return;
420            };
421            match value {
422                Some(value) => {
423                    values.insert(key, value);
424                }
425                None => {
426                    values.remove(&key);
427                }
428            }
429        }
430    });
431}
432
433fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
434    let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
435
436    if let Some(manager) = db.store().get_collection("red_config") {
437        manager.for_each_entity(|entity| {
438            let Some(row) = entity.data.as_row() else {
439                return true;
440            };
441            let Some(Value::Text(key)) = row.get_field("key") else {
442                return true;
443            };
444            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
445            let id = entity.id.raw();
446            let key = key.to_ascii_lowercase();
447            insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
448            if let Some(rest) = key.strip_prefix("red.config.") {
449                insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
450            }
451            true
452        });
453    }
454
455    if let Some(manager) = db.store().get_collection("red.config") {
456        manager.for_each_entity(|entity| {
457            let Some(row) = entity.data.as_row() else {
458                return true;
459            };
460            if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
461                return true;
462            }
463            let Some(Value::Text(key)) = row.get_field("key") else {
464                return true;
465            };
466            let value = row.get_field("value").cloned().unwrap_or(Value::Null);
467            insert_latest_config_value(
468                &mut latest,
469                format!("red.config/{}", key.to_ascii_lowercase()),
470                entity.id.raw(),
471                value,
472            );
473            true
474        });
475    }
476
477    latest
478        .into_iter()
479        .map(|(key, (_, value))| (key, value))
480        .collect()
481}
482
483fn insert_latest_config_value(
484    latest: &mut HashMap<String, (u64, Value)>,
485    key: String,
486    id: u64,
487    value: Value,
488) {
489    match latest.get(&key) {
490        Some((prev_id, _)) if *prev_id > id => {}
491        _ => {
492            latest.insert(key, (id, value));
493        }
494    }
495}
496
497struct ConfigResolver {
498    db: Arc<RedDB>,
499    values: Option<HashMap<String, Value>>,
500}
501
502pub(crate) struct ConfigSnapshotGuard {
503    previous: Option<ConfigResolver>,
504}
505
506impl ConfigSnapshotGuard {
507    pub(super) fn install(db: Arc<RedDB>) -> Self {
508        let previous = CURRENT_CONFIG_RESOLVER
509            .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
510        Self { previous }
511    }
512}
513
514impl Drop for ConfigSnapshotGuard {
515    fn drop(&mut self) {
516        let previous = self.previous.take();
517        CURRENT_CONFIG_RESOLVER.with(|cell| {
518            cell.replace(previous);
519        });
520    }
521}
522
523/// Install the MVCC snapshot used by the current thread for the duration
524/// of one statement. Paired with `clear_current_snapshot()` — callers
525/// should prefer the `CurrentSnapshotGuard` RAII wrapper so early returns
526/// still clean up.
527pub fn set_current_snapshot(ctx: SnapshotContext) {
528    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
529    HAS_SNAPSHOT.with(|c| c.set(true));
530}
531
532pub fn clear_current_snapshot() {
533    CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
534    HAS_SNAPSHOT.with(|c| c.set(false));
535}
536
537/// Drop-guard that restores the previous snapshot on scope exit. Safe to
538/// nest — each statement saves the caller's snapshot and puts it back
539/// instead of blindly clearing, so a top-level `execute_query` called
540/// from inside another statement dispatch (e.g. vector source subqueries)
541/// doesn't strip visibility from the outer scan.
542pub(crate) struct CurrentSnapshotGuard {
543    previous: Option<SnapshotContext>,
544}
545
546impl CurrentSnapshotGuard {
547    pub(crate) fn install(ctx: SnapshotContext) -> Self {
548        let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
549        set_current_snapshot(ctx);
550        Self { previous }
551    }
552}
553
554impl Drop for CurrentSnapshotGuard {
555    fn drop(&mut self) {
556        let prev = self.previous.take();
557        let has = prev.is_some();
558        CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
559        HAS_SNAPSHOT.with(|c| c.set(has));
560    }
561}
562
563/// Is this entity visible under the current thread's MVCC snapshot?
564///
565/// Returns `true` (no filtering) when no snapshot is installed — that
566/// path is used by embedded callers and by operations that intentionally
567/// bypass MVCC (VACUUM, snapshot export, admin introspection).
568///
569/// When a snapshot is installed the result is
570///   `snapshot.sees(xmin, xmax) && !mgr.is_aborted(xmin) && !xmax_half_abort`
571/// where `xmax_half_abort` re-grants visibility for tuples whose
572/// deleting transaction rolled back.
573#[inline]
574pub fn entity_visible_under_current_snapshot(
575    entity: &crate::storage::unified::entity::UnifiedEntity,
576) -> bool {
577    // Moderation visibility gate (#1274, ADR 0057). A row carrying the
578    // moderation status marker — quarantine-pending or rejected-tombstone
579    // — is hidden from every normal read, on top of MVCC visibility. The
580    // marker lives on the row itself, so the check is a single field probe
581    // and rides the existing per-row visibility chokepoint rather than
582    // adding a separate filter pass to each scan call-site.
583    if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
584        return false;
585    }
586    // Fast path — one `Cell<bool>` read, no RefCell borrow. Autocommit
587    // reads (no active MVCC transaction) still hide superseded physical
588    // versions while avoiding a full snapshot-context lookup.
589    // This runs on every row of every scan; the slow path only fires
590    // inside an explicit transaction.
591    if !HAS_SNAPSHOT.with(|c| c.get()) {
592        return entity.xmax == 0;
593    }
594    CURRENT_SNAPSHOT.with(|cell| {
595        let guard = cell.borrow();
596        let Some(ctx) = guard.as_ref() else {
597            return true;
598        };
599        visibility_check(ctx, entity.xmin, entity.xmax)
600    })
601}
602
603/// Direct visibility check from raw `(xmin, xmax)` — bypasses the
604/// entity borrow for callers that already decomposed the tuple (e.g.
605/// pre-materialized scan caches). Same semantics as
606/// `entity_visible_under_current_snapshot`.
607#[inline]
608pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
609    if !HAS_SNAPSHOT.with(|c| c.get()) {
610        return true;
611    }
612    CURRENT_SNAPSHOT.with(|cell| {
613        let guard = cell.borrow();
614        let Some(ctx) = guard.as_ref() else {
615            return true;
616        };
617        visibility_check(ctx, xmin, xmax)
618    })
619}
620
621/// Clone the current thread's snapshot context. Parallel scan paths
622/// (`query_all_zoned` with `std::thread::scope`) call this on the main
623/// thread *before* spawning workers so the captured `SnapshotContext`
624/// can be moved into every worker closure. Worker threads do not
625/// inherit thread-locals, so calling `entity_visible_under_current_snapshot`
626/// from inside a spawned closure would silently skip the filter.
627pub fn capture_current_snapshot() -> Option<SnapshotContext> {
628    CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
629}
630
631/// Whether the active read snapshot may need historical tuple versions
632/// that the current secondary indexes cannot prove. Index paths can still
633/// recheck visible candidates, but only a heap scan can discover versions
634/// whose indexed value was changed or deleted after this snapshot.
635pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
636    if !HAS_SNAPSHOT.with(|c| c.get()) {
637        return false;
638    }
639    CURRENT_SNAPSHOT.with(|cell| {
640        cell.borrow()
641            .as_ref()
642            .is_some_and(|ctx| ctx.requires_index_fallback)
643    })
644}
645
646/// Frozen MVCC + identity context for callers that need to reinstall
647/// the same view across thread-local boundaries — long-lived cursors,
648/// background batchers, anything that detaches from the dispatch path
649/// and re-enters later.
650///
651/// The bundle bakes in the three thread-locals every read path
652/// consults: `SnapshotContext` (MVCC visibility), the auth identity
653/// (RLS policy gate), and the tenant id (RLS scope). A FETCH that
654/// reinstalls the bundle sees exactly the same rows as the DECLARE
655/// would have, regardless of writes that landed in between.
656///
657/// Cheap to clone — `SnapshotContext` is a clone of three
658/// `Arc`-backed fields, identity is a `(String, Role)`, tenant is a
659/// `String`. None of these contend with the read path.
660#[derive(Clone, Default)]
661pub struct SnapshotBundle {
662    pub snapshot: Option<SnapshotContext>,
663    pub auth: Option<(String, crate::auth::Role)>,
664    pub tenant: Option<String>,
665}
666
667/// Capture the three read-path thread-locals into a `SnapshotBundle`.
668/// Pairs with `with_snapshot_bundle` for re-entry.
669pub fn snapshot_bundle() -> SnapshotBundle {
670    SnapshotBundle {
671        snapshot: capture_current_snapshot(),
672        auth: current_auth_identity(),
673        tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
674    }
675}
676
677/// Reinstall a captured `SnapshotBundle` for the duration of `f`.
678/// Restores the caller's previous thread-locals on exit (panic-safe via
679/// the explicit guard struct so a panic in `f` cannot leak the
680/// installed identity into the worker's next request).
681pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
682    struct Guard {
683        prev_snapshot: Option<SnapshotContext>,
684        prev_auth: Option<(String, crate::auth::Role)>,
685        prev_tenant: Option<String>,
686    }
687    impl Drop for Guard {
688        fn drop(&mut self) {
689            let snap = self.prev_snapshot.take();
690            let has = snap.is_some();
691            CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
692            HAS_SNAPSHOT.with(|c| c.set(has));
693            CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
694            CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
695        }
696    }
697
698    let _guard = {
699        let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
700        let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
701        let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
702
703        match bundle.snapshot.clone() {
704            Some(ctx) => set_current_snapshot(ctx),
705            None => clear_current_snapshot(),
706        }
707        CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
708        CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
709
710        Guard {
711            prev_snapshot,
712            prev_auth,
713            prev_tenant,
714        }
715    };
716    f()
717}
718
719/// Apply the same visibility rules used by the thread-local helpers
720/// against a caller-provided context. Intended for parallel workers
721/// that captured the snapshot with `capture_current_snapshot()`.
722#[inline]
723pub fn entity_visible_with_context(
724    ctx: Option<&SnapshotContext>,
725    entity: &crate::storage::unified::entity::UnifiedEntity,
726) -> bool {
727    // Same moderation visibility gate as the thread-local path (#1274):
728    // parallel scan workers capture the snapshot context but must apply
729    // the moderation marker check identically.
730    if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
731        return false;
732    }
733    match ctx {
734        Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
735        None => true,
736    }
737}
738
739#[inline]
740fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
741    // Writer aborted → tuple never existed from any future reader's view.
742    // Checked *before* the own-xids fast path so an aborted own-sub-xid
743    // (rolled-back savepoint) stays hidden from the parent.
744    if xmin != 0 && ctx.manager.is_aborted(xmin) {
745        return false;
746    }
747    // Deleter aborted → treat xmax as unset; fall back to xmin-only check.
748    let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
749        0
750    } else {
751        xmax
752    };
753    // Phase 2.3.2e: own-tx writes are always visible to the connection
754    // that stamped them, even when xmin/xmax exceed `snapshot.xid` (as
755    // happens for sub-xids allocated by SAVEPOINT after BEGIN).
756    let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
757    let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
758    if own_xmax {
759        // This connection deleted the row via this xid — hide it from self.
760        return false;
761    }
762    if own_xmin {
763        return true;
764    }
765    ctx.snapshot.sees(xmin, effective_xmax)
766}