use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::api::{RedDBError, RedDBResult};
use crate::storage::schema::Value;
use crate::storage::RedDB;
thread_local! {
static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
const { std::cell::RefCell::new(None) };
static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
const { std::cell::RefCell::new(None) };
static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
const { std::cell::RefCell::new(None) };
static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
const { std::cell::RefCell::new(None) };
static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
const { std::cell::RefCell::new(None) };
}
#[derive(Clone)]
pub struct SnapshotContext {
pub snapshot: crate::storage::transaction::snapshot::Snapshot,
pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
pub requires_index_fallback: bool,
}
pub fn set_current_connection_id(id: u64) {
CURRENT_CONN_ID.with(|c| c.set(id));
}
pub fn clear_current_connection_id() {
CURRENT_CONN_ID.with(|c| c.set(0));
}
pub fn current_connection_id() -> u64 {
CURRENT_CONN_ID.with(|c| c.get())
}
pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
}
pub fn clear_current_auth_identity() {
CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
}
pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
}
pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
current_auth_identity()
}
pub fn set_current_tenant(tenant_id: String) {
CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
}
pub fn clear_current_tenant() {
CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
}
pub fn current_tenant() -> Option<String> {
let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
if let Some(over) = current_scope_override() {
if over.tenant.is_active() {
return over.tenant.resolve(inherited);
}
}
if let Some(tx_local) = current_tx_local_tenant() {
return tx_local;
}
inherited
}
thread_local! {
static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
const { std::cell::RefCell::new(None) };
}
fn current_tx_local_tenant() -> Option<Option<String>> {
TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
}
pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
let mut tokens = query.split_ascii_whitespace();
let Some(w1) = tokens.next() else {
return Ok(None);
};
if !w1.eq_ignore_ascii_case("SET") {
return Ok(None);
}
let Some(w2) = tokens.next() else {
return Ok(None);
};
if !w2.eq_ignore_ascii_case("LOCAL") {
return Ok(None);
}
let Some(w3) = tokens.next() else {
return Ok(None);
};
if !w3.eq_ignore_ascii_case("TENANT") {
return Ok(None);
}
let rest: String = tokens.collect::<Vec<_>>().join(" ");
let rest = rest.trim().trim_end_matches(';').trim();
let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
if value_str.is_empty() {
return Err(RedDBError::Query(
"SET LOCAL TENANT expects a string literal or NULL".to_string(),
));
}
if value_str.eq_ignore_ascii_case("NULL") {
return Ok(Some(None));
}
if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
let inner = &value_str[1..value_str.len() - 1];
return Ok(Some(Some(inner.to_string())));
}
Err(RedDBError::Query(format!(
"SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
)))
}
pub(crate) struct TxLocalTenantGuard;
impl TxLocalTenantGuard {
pub fn install(value: Option<Option<String>>) -> Self {
TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
Self
}
}
impl Drop for TxLocalTenantGuard {
fn drop(&mut self) {
TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
}
}
thread_local! {
static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
const { std::cell::RefCell::new(Vec::new()) };
}
pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
}
pub(crate) fn pop_scope_override() {
SCOPE_OVERRIDES.with(|cell| {
cell.borrow_mut().pop();
});
}
pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
}
pub(crate) fn has_scope_override_active() -> bool {
SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
}
pub(crate) struct ScopeOverrideGuard;
impl ScopeOverrideGuard {
pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
push_scope_override(over);
Self
}
}
impl Drop for ScopeOverrideGuard {
fn drop(&mut self) {
pop_scope_override();
}
}
pub(crate) fn current_user_projected() -> Option<String> {
let inherited = current_auth_identity().map(|(u, _)| u);
if let Some(over) = current_scope_override() {
if over.user.is_active() {
return over.user.resolve(inherited);
}
}
inherited
}
pub(crate) fn current_role_projected() -> Option<String> {
let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
if let Some(over) = current_scope_override() {
if over.role.is_active() {
return over.role.resolve(inherited);
}
}
inherited
}
pub(crate) fn current_secret_value(path: &str) -> Option<String> {
let key = path.to_ascii_lowercase();
CURRENT_SECRET_RESOLVER.with(|cell| {
let mut resolver = cell.borrow_mut();
let resolver = resolver.as_mut()?;
if resolver.values.is_none() {
resolver.values = resolver
.store
.as_ref()
.map(|store| store.vault_kv_snapshot());
}
let values = resolver.values.as_ref()?;
values.get(&key).cloned().or_else(|| {
key.strip_prefix("red.vault/").and_then(|rest| {
values
.get(rest)
.cloned()
.or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
})
})
})
}
struct SecretResolver {
store: Option<Arc<crate::auth::store::AuthStore>>,
values: Option<HashMap<String, String>>,
}
pub(crate) struct SecretStoreGuard {
previous: Option<SecretResolver>,
}
impl SecretStoreGuard {
pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
cell.replace(Some(SecretResolver {
store,
values: None,
}))
});
Self { previous }
}
}
impl Drop for SecretStoreGuard {
fn drop(&mut self) {
let previous = self.previous.take();
CURRENT_SECRET_RESOLVER.with(|cell| {
cell.replace(previous);
});
}
}
pub(crate) fn current_config_value(path: &str) -> Option<Value> {
let key = path.to_ascii_lowercase();
CURRENT_CONFIG_RESOLVER.with(|cell| {
let mut resolver = cell.borrow_mut();
let resolver = resolver.as_mut()?;
if resolver.values.is_none() {
resolver.values = Some(latest_config_snapshot(&resolver.db));
}
let values = resolver.values.as_ref()?;
values.get(&key).cloned().or_else(|| {
key.strip_prefix("red.config/")
.and_then(|rest| values.get(&format!("red.config.{rest}")).cloned())
})
})
}
pub(crate) fn update_current_config_value(path: &str, value: Value) {
let key = path.to_ascii_lowercase();
CURRENT_CONFIG_RESOLVER.with(|cell| {
if let Some(resolver) = cell.borrow_mut().as_mut() {
if let Some(values) = resolver.values.as_mut() {
values.insert(key, value);
}
}
});
}
pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
let key = path.to_ascii_lowercase();
CURRENT_SECRET_RESOLVER.with(|cell| {
if let Some(resolver) = cell.borrow_mut().as_mut() {
let Some(values) = resolver.values.as_mut() else {
return;
};
match value {
Some(value) => {
values.insert(key, value);
}
None => {
values.remove(&key);
}
}
}
});
}
fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
if let Some(manager) = db.store().get_collection("red_config") {
manager.for_each_entity(|entity| {
let Some(row) = entity.data.as_row() else {
return true;
};
let Some(Value::Text(key)) = row.get_field("key") else {
return true;
};
let value = row.get_field("value").cloned().unwrap_or(Value::Null);
let id = entity.id.raw();
let key = key.to_ascii_lowercase();
insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
if let Some(rest) = key.strip_prefix("red.config.") {
insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
}
true
});
}
if let Some(manager) = db.store().get_collection("red.config") {
manager.for_each_entity(|entity| {
let Some(row) = entity.data.as_row() else {
return true;
};
if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
return true;
}
let Some(Value::Text(key)) = row.get_field("key") else {
return true;
};
let value = row.get_field("value").cloned().unwrap_or(Value::Null);
insert_latest_config_value(
&mut latest,
format!("red.config/{}", key.to_ascii_lowercase()),
entity.id.raw(),
value,
);
true
});
}
latest
.into_iter()
.map(|(key, (_, value))| (key, value))
.collect()
}
fn insert_latest_config_value(
latest: &mut HashMap<String, (u64, Value)>,
key: String,
id: u64,
value: Value,
) {
match latest.get(&key) {
Some((prev_id, _)) if *prev_id > id => {}
_ => {
latest.insert(key, (id, value));
}
}
}
struct ConfigResolver {
db: Arc<RedDB>,
values: Option<HashMap<String, Value>>,
}
pub(crate) struct ConfigSnapshotGuard {
previous: Option<ConfigResolver>,
}
impl ConfigSnapshotGuard {
pub(super) fn install(db: Arc<RedDB>) -> Self {
let previous = CURRENT_CONFIG_RESOLVER
.with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
Self { previous }
}
}
impl Drop for ConfigSnapshotGuard {
fn drop(&mut self) {
let previous = self.previous.take();
CURRENT_CONFIG_RESOLVER.with(|cell| {
cell.replace(previous);
});
}
}
pub fn set_current_snapshot(ctx: SnapshotContext) {
CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
HAS_SNAPSHOT.with(|c| c.set(true));
}
pub fn clear_current_snapshot() {
CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
HAS_SNAPSHOT.with(|c| c.set(false));
}
pub(crate) struct CurrentSnapshotGuard {
previous: Option<SnapshotContext>,
}
impl CurrentSnapshotGuard {
pub(crate) fn install(ctx: SnapshotContext) -> Self {
let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
set_current_snapshot(ctx);
Self { previous }
}
}
impl Drop for CurrentSnapshotGuard {
fn drop(&mut self) {
let prev = self.previous.take();
let has = prev.is_some();
CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
HAS_SNAPSHOT.with(|c| c.set(has));
}
}
#[inline]
pub fn entity_visible_under_current_snapshot(
entity: &crate::storage::unified::entity::UnifiedEntity,
) -> bool {
if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
return false;
}
if !HAS_SNAPSHOT.with(|c| c.get()) {
return entity.xmax == 0;
}
CURRENT_SNAPSHOT.with(|cell| {
let guard = cell.borrow();
let Some(ctx) = guard.as_ref() else {
return true;
};
visibility_check(ctx, entity.xmin, entity.xmax)
})
}
#[inline]
pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
if !HAS_SNAPSHOT.with(|c| c.get()) {
return true;
}
CURRENT_SNAPSHOT.with(|cell| {
let guard = cell.borrow();
let Some(ctx) = guard.as_ref() else {
return true;
};
visibility_check(ctx, xmin, xmax)
})
}
pub fn capture_current_snapshot() -> Option<SnapshotContext> {
CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
}
pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
if !HAS_SNAPSHOT.with(|c| c.get()) {
return false;
}
CURRENT_SNAPSHOT.with(|cell| {
cell.borrow()
.as_ref()
.is_some_and(|ctx| ctx.requires_index_fallback)
})
}
#[derive(Clone, Default)]
pub struct SnapshotBundle {
pub snapshot: Option<SnapshotContext>,
pub auth: Option<(String, crate::auth::Role)>,
pub tenant: Option<String>,
}
pub fn snapshot_bundle() -> SnapshotBundle {
SnapshotBundle {
snapshot: capture_current_snapshot(),
auth: current_auth_identity(),
tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
}
}
pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
struct Guard {
prev_snapshot: Option<SnapshotContext>,
prev_auth: Option<(String, crate::auth::Role)>,
prev_tenant: Option<String>,
}
impl Drop for Guard {
fn drop(&mut self) {
let snap = self.prev_snapshot.take();
let has = snap.is_some();
CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
HAS_SNAPSHOT.with(|c| c.set(has));
CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
}
}
let _guard = {
let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
match bundle.snapshot.clone() {
Some(ctx) => set_current_snapshot(ctx),
None => clear_current_snapshot(),
}
CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
Guard {
prev_snapshot,
prev_auth,
prev_tenant,
}
};
f()
}
#[inline]
pub fn entity_visible_with_context(
ctx: Option<&SnapshotContext>,
entity: &crate::storage::unified::entity::UnifiedEntity,
) -> bool {
if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
return false;
}
match ctx {
Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
None => true,
}
}
#[inline]
fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
if xmin != 0 && ctx.manager.is_aborted(xmin) {
return false;
}
let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
0
} else {
xmax
};
let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
if own_xmax {
return false;
}
if own_xmin {
return true;
}
ctx.snapshot.sees(xmin, effective_xmax)
}