1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::api::{RedDBError, RedDBResult};
10use crate::storage::schema::Value;
11use crate::storage::RedDB;
12
13thread_local! {
14 static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
18
19 static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
27 const { std::cell::RefCell::new(None) };
28
29 static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
39 const { std::cell::RefCell::new(None) };
40
41 static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
47
48 static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
58 const { std::cell::RefCell::new(None) };
59
60 static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
64 const { std::cell::RefCell::new(None) };
65
66 static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
70 const { std::cell::RefCell::new(None) };
71}
72
73#[derive(Clone)]
88pub struct SnapshotContext {
89 pub snapshot: crate::storage::transaction::snapshot::Snapshot,
90 pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
91 pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
92 pub requires_index_fallback: bool,
93}
94
95pub fn set_current_connection_id(id: u64) {
104 CURRENT_CONN_ID.with(|c| c.set(id));
105}
106
107pub fn clear_current_connection_id() {
109 CURRENT_CONN_ID.with(|c| c.set(0));
110}
111
112pub fn current_connection_id() -> u64 {
115 CURRENT_CONN_ID.with(|c| c.get())
116}
117
118pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
122 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
123}
124
125pub fn clear_current_auth_identity() {
129 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
130}
131
132pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
135 CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
136}
137
138pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
142 current_auth_identity()
143}
144
145pub fn set_current_tenant(tenant_id: String) {
150 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
151}
152
153pub fn clear_current_tenant() {
156 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
157}
158
159pub fn current_tenant() -> Option<String> {
170 let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
171 if let Some(over) = current_scope_override() {
172 if over.tenant.is_active() {
173 return over.tenant.resolve(inherited);
174 }
175 }
176 if let Some(tx_local) = current_tx_local_tenant() {
177 return tx_local;
178 }
179 inherited
180}
181
182thread_local! {
183 static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
192 const { std::cell::RefCell::new(None) };
193}
194
195fn current_tx_local_tenant() -> Option<Option<String>> {
196 TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
197}
198
199pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
205 let mut tokens = query.split_ascii_whitespace();
206 let Some(w1) = tokens.next() else {
207 return Ok(None);
208 };
209 if !w1.eq_ignore_ascii_case("SET") {
210 return Ok(None);
211 }
212 let Some(w2) = tokens.next() else {
213 return Ok(None);
214 };
215 if !w2.eq_ignore_ascii_case("LOCAL") {
216 return Ok(None);
217 }
218 let Some(w3) = tokens.next() else {
219 return Ok(None);
220 };
221 if !w3.eq_ignore_ascii_case("TENANT") {
222 return Ok(None);
223 }
224 let rest: String = tokens.collect::<Vec<_>>().join(" ");
225 let rest = rest.trim().trim_end_matches(';').trim();
226 let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
227 if value_str.is_empty() {
228 return Err(RedDBError::Query(
229 "SET LOCAL TENANT expects a string literal or NULL".to_string(),
230 ));
231 }
232 if value_str.eq_ignore_ascii_case("NULL") {
233 return Ok(Some(None));
234 }
235 if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
236 let inner = &value_str[1..value_str.len() - 1];
237 return Ok(Some(Some(inner.to_string())));
238 }
239 Err(RedDBError::Query(format!(
240 "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
241 )))
242}
243
244pub(crate) struct TxLocalTenantGuard;
245
246impl TxLocalTenantGuard {
247 pub fn install(value: Option<Option<String>>) -> Self {
248 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
249 Self
250 }
251}
252
253impl Drop for TxLocalTenantGuard {
254 fn drop(&mut self) {
255 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
256 }
257}
258
259thread_local! {
260 static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
267 const { std::cell::RefCell::new(Vec::new()) };
268}
269
270pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
271 SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
272}
273
274pub(crate) fn pop_scope_override() {
275 SCOPE_OVERRIDES.with(|cell| {
276 cell.borrow_mut().pop();
277 });
278}
279
280pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
281 SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
282}
283
284pub(crate) fn has_scope_override_active() -> bool {
288 SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
289}
290
291pub(crate) struct ScopeOverrideGuard;
295
296impl ScopeOverrideGuard {
297 pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
298 push_scope_override(over);
299 Self
300 }
301}
302
303impl Drop for ScopeOverrideGuard {
304 fn drop(&mut self) {
305 pop_scope_override();
306 }
307}
308
309pub(crate) fn current_user_projected() -> Option<String> {
315 let inherited = current_auth_identity().map(|(u, _)| u);
316 if let Some(over) = current_scope_override() {
317 if over.user.is_active() {
318 return over.user.resolve(inherited);
319 }
320 }
321 inherited
322}
323
324pub(crate) fn current_role_projected() -> Option<String> {
325 let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
326 if let Some(over) = current_scope_override() {
327 if over.role.is_active() {
328 return over.role.resolve(inherited);
329 }
330 }
331 inherited
332}
333
334pub(crate) fn current_secret_value(path: &str) -> Option<String> {
335 let key = path.to_ascii_lowercase();
336 CURRENT_SECRET_RESOLVER.with(|cell| {
337 let mut resolver = cell.borrow_mut();
338 let resolver = resolver.as_mut()?;
339 if resolver.values.is_none() {
340 resolver.values = resolver
341 .store
342 .as_ref()
343 .map(|store| store.vault_kv_snapshot());
344 }
345 let values = resolver.values.as_ref()?;
346 values.get(&key).cloned().or_else(|| {
347 key.strip_prefix("red.vault/").and_then(|rest| {
348 values
349 .get(rest)
350 .cloned()
351 .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
352 })
353 })
354 })
355}
356
357struct SecretResolver {
358 store: Option<Arc<crate::auth::store::AuthStore>>,
359 values: Option<HashMap<String, String>>,
360}
361
362pub(crate) struct SecretStoreGuard {
363 previous: Option<SecretResolver>,
364}
365
366impl SecretStoreGuard {
367 pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
368 let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
369 cell.replace(Some(SecretResolver {
370 store,
371 values: None,
372 }))
373 });
374 Self { previous }
375 }
376}
377
378impl Drop for SecretStoreGuard {
379 fn drop(&mut self) {
380 let previous = self.previous.take();
381 CURRENT_SECRET_RESOLVER.with(|cell| {
382 cell.replace(previous);
383 });
384 }
385}
386
387pub(crate) fn current_config_value(path: &str) -> Option<Value> {
388 let key = path.to_ascii_lowercase();
389 CURRENT_CONFIG_RESOLVER.with(|cell| {
390 let mut resolver = cell.borrow_mut();
391 let resolver = resolver.as_mut()?;
392 if resolver.values.is_none() {
393 resolver.values = Some(latest_config_snapshot(&resolver.db));
394 }
395 let values = resolver.values.as_ref()?;
396 values.get(&key).cloned().or_else(|| {
401 key.strip_prefix("red.config/").and_then(|rest| {
402 values
403 .get(rest)
404 .cloned()
405 .or_else(|| values.get(&format!("red.config.{rest}")).cloned())
406 })
407 })
408 })
409}
410
411pub(crate) fn update_current_config_value(path: &str, value: Value) {
412 let key = path.to_ascii_lowercase();
413 CURRENT_CONFIG_RESOLVER.with(|cell| {
414 if let Some(resolver) = cell.borrow_mut().as_mut() {
415 if let Some(values) = resolver.values.as_mut() {
416 values.insert(key, value);
417 }
418 }
419 });
420}
421
422pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
423 let key = path.to_ascii_lowercase();
424 CURRENT_SECRET_RESOLVER.with(|cell| {
425 if let Some(resolver) = cell.borrow_mut().as_mut() {
426 let Some(values) = resolver.values.as_mut() else {
427 return;
428 };
429 match value {
430 Some(value) => {
431 values.insert(key, value);
432 }
433 None => {
434 values.remove(&key);
435 }
436 }
437 }
438 });
439}
440
441fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
442 let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
443
444 if let Some(manager) = db.store().get_collection("red_config") {
445 manager.for_each_entity(|entity| {
446 let Some(row) = entity.data.as_row() else {
447 return true;
448 };
449 let Some(Value::Text(key)) = row.get_field("key") else {
450 return true;
451 };
452 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
453 let id = entity.id.raw();
454 let key = key.to_ascii_lowercase();
455 insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
456 if let Some(rest) = key.strip_prefix("red.config.") {
457 insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
458 }
459 true
460 });
461 }
462
463 if let Some(manager) = db.store().get_collection("red.config") {
464 manager.for_each_entity(|entity| {
465 let Some(row) = entity.data.as_row() else {
466 return true;
467 };
468 if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
469 return true;
470 }
471 let Some(Value::Text(key)) = row.get_field("key") else {
472 return true;
473 };
474 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
475 insert_latest_config_value(
476 &mut latest,
477 format!("red.config/{}", key.to_ascii_lowercase()),
478 entity.id.raw(),
479 value,
480 );
481 true
482 });
483 }
484
485 latest
486 .into_iter()
487 .map(|(key, (_, value))| (key, value))
488 .collect()
489}
490
491fn insert_latest_config_value(
492 latest: &mut HashMap<String, (u64, Value)>,
493 key: String,
494 id: u64,
495 value: Value,
496) {
497 match latest.get(&key) {
498 Some((prev_id, _)) if *prev_id > id => {}
499 _ => {
500 latest.insert(key, (id, value));
501 }
502 }
503}
504
505struct ConfigResolver {
506 db: Arc<RedDB>,
507 values: Option<HashMap<String, Value>>,
508}
509
510pub(crate) struct ConfigSnapshotGuard {
511 previous: Option<ConfigResolver>,
512}
513
514impl ConfigSnapshotGuard {
515 pub(super) fn install(db: Arc<RedDB>) -> Self {
516 let previous = CURRENT_CONFIG_RESOLVER
517 .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
518 Self { previous }
519 }
520}
521
522impl Drop for ConfigSnapshotGuard {
523 fn drop(&mut self) {
524 let previous = self.previous.take();
525 CURRENT_CONFIG_RESOLVER.with(|cell| {
526 cell.replace(previous);
527 });
528 }
529}
530
531pub fn set_current_snapshot(ctx: SnapshotContext) {
536 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
537 HAS_SNAPSHOT.with(|c| c.set(true));
538}
539
540pub fn clear_current_snapshot() {
541 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
542 HAS_SNAPSHOT.with(|c| c.set(false));
543}
544
545pub(crate) struct CurrentSnapshotGuard {
551 previous: Option<SnapshotContext>,
552}
553
554impl CurrentSnapshotGuard {
555 pub(crate) fn install(ctx: SnapshotContext) -> Self {
556 let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
557 set_current_snapshot(ctx);
558 Self { previous }
559 }
560}
561
562impl Drop for CurrentSnapshotGuard {
563 fn drop(&mut self) {
564 let prev = self.previous.take();
565 let has = prev.is_some();
566 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
567 HAS_SNAPSHOT.with(|c| c.set(has));
568 }
569}
570
571#[inline]
582pub fn entity_visible_under_current_snapshot(
583 entity: &crate::storage::unified::entity::UnifiedEntity,
584) -> bool {
585 if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
592 return false;
593 }
594 if !HAS_SNAPSHOT.with(|c| c.get()) {
600 return entity.xmax == 0;
601 }
602 CURRENT_SNAPSHOT.with(|cell| {
603 let guard = cell.borrow();
604 let Some(ctx) = guard.as_ref() else {
605 return true;
606 };
607 visibility_check(ctx, entity.xmin, entity.xmax)
608 })
609}
610
611#[inline]
616pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
617 if !HAS_SNAPSHOT.with(|c| c.get()) {
618 return true;
619 }
620 CURRENT_SNAPSHOT.with(|cell| {
621 let guard = cell.borrow();
622 let Some(ctx) = guard.as_ref() else {
623 return true;
624 };
625 visibility_check(ctx, xmin, xmax)
626 })
627}
628
629pub fn capture_current_snapshot() -> Option<SnapshotContext> {
636 CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
637}
638
639pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
644 if !HAS_SNAPSHOT.with(|c| c.get()) {
645 return false;
646 }
647 CURRENT_SNAPSHOT.with(|cell| {
648 cell.borrow()
649 .as_ref()
650 .is_some_and(|ctx| ctx.requires_index_fallback)
651 })
652}
653
654#[derive(Clone, Default)]
669pub struct SnapshotBundle {
670 pub snapshot: Option<SnapshotContext>,
671 pub auth: Option<(String, crate::auth::Role)>,
672 pub tenant: Option<String>,
673}
674
675pub fn snapshot_bundle() -> SnapshotBundle {
678 SnapshotBundle {
679 snapshot: capture_current_snapshot(),
680 auth: current_auth_identity(),
681 tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
682 }
683}
684
685pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
690 struct Guard {
691 prev_snapshot: Option<SnapshotContext>,
692 prev_auth: Option<(String, crate::auth::Role)>,
693 prev_tenant: Option<String>,
694 }
695 impl Drop for Guard {
696 fn drop(&mut self) {
697 let snap = self.prev_snapshot.take();
698 let has = snap.is_some();
699 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
700 HAS_SNAPSHOT.with(|c| c.set(has));
701 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
702 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
703 }
704 }
705
706 let _guard = {
707 let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
708 let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
709 let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
710
711 match bundle.snapshot.clone() {
712 Some(ctx) => set_current_snapshot(ctx),
713 None => clear_current_snapshot(),
714 }
715 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
716 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
717
718 Guard {
719 prev_snapshot,
720 prev_auth,
721 prev_tenant,
722 }
723 };
724 f()
725}
726
727#[inline]
731pub fn entity_visible_with_context(
732 ctx: Option<&SnapshotContext>,
733 entity: &crate::storage::unified::entity::UnifiedEntity,
734) -> bool {
735 if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
739 return false;
740 }
741 match ctx {
742 Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
743 None => true,
744 }
745}
746
747#[inline]
748fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
749 if xmin != 0 && ctx.manager.is_aborted(xmin) {
753 return false;
754 }
755 let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
757 0
758 } else {
759 xmax
760 };
761 let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
765 let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
766 if own_xmax {
767 return false;
769 }
770 if own_xmin {
771 return true;
772 }
773 ctx.snapshot.sees(xmin, effective_xmax)
774}