1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::api::{RedDBError, RedDBResult};
10use crate::storage::schema::Value;
11use crate::storage::RedDB;
12
13thread_local! {
14 static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
18
19 static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
27 const { std::cell::RefCell::new(None) };
28
29 static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
39 const { std::cell::RefCell::new(None) };
40
41 static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
47
48 static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
58 const { std::cell::RefCell::new(None) };
59
60 static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
64 const { std::cell::RefCell::new(None) };
65
66 static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
70 const { std::cell::RefCell::new(None) };
71}
72
73#[derive(Clone)]
88pub struct SnapshotContext {
89 pub snapshot: crate::storage::transaction::snapshot::Snapshot,
90 pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
91 pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
92 pub requires_index_fallback: bool,
93}
94
95pub fn set_current_connection_id(id: u64) {
104 CURRENT_CONN_ID.with(|c| c.set(id));
105}
106
107pub fn clear_current_connection_id() {
109 CURRENT_CONN_ID.with(|c| c.set(0));
110}
111
112pub fn current_connection_id() -> u64 {
115 CURRENT_CONN_ID.with(|c| c.get())
116}
117
118pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
122 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
123}
124
125pub fn clear_current_auth_identity() {
129 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
130}
131
132pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
135 CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
136}
137
138pub fn current_auth_identity_for_audit() -> Option<(String, crate::auth::Role)> {
142 current_auth_identity()
143}
144
145pub fn set_current_tenant(tenant_id: String) {
150 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
151}
152
153pub fn clear_current_tenant() {
156 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
157}
158
159pub fn current_tenant() -> Option<String> {
170 let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
171 if let Some(over) = current_scope_override() {
172 if over.tenant.is_active() {
173 return over.tenant.resolve(inherited);
174 }
175 }
176 if let Some(tx_local) = current_tx_local_tenant() {
177 return tx_local;
178 }
179 inherited
180}
181
182thread_local! {
183 static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
192 const { std::cell::RefCell::new(None) };
193}
194
195fn current_tx_local_tenant() -> Option<Option<String>> {
196 TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
197}
198
199pub(crate) fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
205 let mut tokens = query.split_ascii_whitespace();
206 let Some(w1) = tokens.next() else {
207 return Ok(None);
208 };
209 if !w1.eq_ignore_ascii_case("SET") {
210 return Ok(None);
211 }
212 let Some(w2) = tokens.next() else {
213 return Ok(None);
214 };
215 if !w2.eq_ignore_ascii_case("LOCAL") {
216 return Ok(None);
217 }
218 let Some(w3) = tokens.next() else {
219 return Ok(None);
220 };
221 if !w3.eq_ignore_ascii_case("TENANT") {
222 return Ok(None);
223 }
224 let rest: String = tokens.collect::<Vec<_>>().join(" ");
225 let rest = rest.trim().trim_end_matches(';').trim();
226 let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
227 if value_str.is_empty() {
228 return Err(RedDBError::Query(
229 "SET LOCAL TENANT expects a string literal or NULL".to_string(),
230 ));
231 }
232 if value_str.eq_ignore_ascii_case("NULL") {
233 return Ok(Some(None));
234 }
235 if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
236 let inner = &value_str[1..value_str.len() - 1];
237 return Ok(Some(Some(inner.to_string())));
238 }
239 Err(RedDBError::Query(format!(
240 "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
241 )))
242}
243
244pub(crate) struct TxLocalTenantGuard;
245
246impl TxLocalTenantGuard {
247 pub fn install(value: Option<Option<String>>) -> Self {
248 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
249 Self
250 }
251}
252
253impl Drop for TxLocalTenantGuard {
254 fn drop(&mut self) {
255 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
256 }
257}
258
259thread_local! {
260 static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
267 const { std::cell::RefCell::new(Vec::new()) };
268}
269
270pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
271 SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
272}
273
274pub(crate) fn pop_scope_override() {
275 SCOPE_OVERRIDES.with(|cell| {
276 cell.borrow_mut().pop();
277 });
278}
279
280pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
281 SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
282}
283
284pub(crate) fn has_scope_override_active() -> bool {
288 SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
289}
290
291pub(crate) struct ScopeOverrideGuard;
295
296impl ScopeOverrideGuard {
297 pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
298 push_scope_override(over);
299 Self
300 }
301}
302
303impl Drop for ScopeOverrideGuard {
304 fn drop(&mut self) {
305 pop_scope_override();
306 }
307}
308
309pub(crate) fn current_user_projected() -> Option<String> {
315 let inherited = current_auth_identity().map(|(u, _)| u);
316 if let Some(over) = current_scope_override() {
317 if over.user.is_active() {
318 return over.user.resolve(inherited);
319 }
320 }
321 inherited
322}
323
324pub(crate) fn current_role_projected() -> Option<String> {
325 let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
326 if let Some(over) = current_scope_override() {
327 if over.role.is_active() {
328 return over.role.resolve(inherited);
329 }
330 }
331 inherited
332}
333
334pub(crate) fn current_secret_value(path: &str) -> Option<String> {
335 let key = path.to_ascii_lowercase();
336 CURRENT_SECRET_RESOLVER.with(|cell| {
337 let mut resolver = cell.borrow_mut();
338 let resolver = resolver.as_mut()?;
339 if resolver.values.is_none() {
340 resolver.values = resolver
341 .store
342 .as_ref()
343 .map(|store| store.vault_kv_snapshot());
344 }
345 let values = resolver.values.as_ref()?;
346 values.get(&key).cloned().or_else(|| {
347 key.strip_prefix("red.vault/").and_then(|rest| {
348 values
349 .get(rest)
350 .cloned()
351 .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
352 })
353 })
354 })
355}
356
357struct SecretResolver {
358 store: Option<Arc<crate::auth::store::AuthStore>>,
359 values: Option<HashMap<String, String>>,
360}
361
362pub(crate) struct SecretStoreGuard {
363 previous: Option<SecretResolver>,
364}
365
366impl SecretStoreGuard {
367 pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
368 let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
369 cell.replace(Some(SecretResolver {
370 store,
371 values: None,
372 }))
373 });
374 Self { previous }
375 }
376}
377
378impl Drop for SecretStoreGuard {
379 fn drop(&mut self) {
380 let previous = self.previous.take();
381 CURRENT_SECRET_RESOLVER.with(|cell| {
382 cell.replace(previous);
383 });
384 }
385}
386
387pub(crate) fn current_config_value(path: &str) -> Option<Value> {
388 let key = path.to_ascii_lowercase();
389 CURRENT_CONFIG_RESOLVER.with(|cell| {
390 let mut resolver = cell.borrow_mut();
391 let resolver = resolver.as_mut()?;
392 if resolver.values.is_none() {
393 resolver.values = Some(latest_config_snapshot(&resolver.db));
394 }
395 let values = resolver.values.as_ref()?;
396 values.get(&key).cloned().or_else(|| {
397 key.strip_prefix("red.config/")
398 .and_then(|rest| values.get(&format!("red.config.{rest}")).cloned())
399 })
400 })
401}
402
403pub(crate) fn update_current_config_value(path: &str, value: Value) {
404 let key = path.to_ascii_lowercase();
405 CURRENT_CONFIG_RESOLVER.with(|cell| {
406 if let Some(resolver) = cell.borrow_mut().as_mut() {
407 if let Some(values) = resolver.values.as_mut() {
408 values.insert(key, value);
409 }
410 }
411 });
412}
413
414pub(crate) fn update_current_secret_value(path: &str, value: Option<String>) {
415 let key = path.to_ascii_lowercase();
416 CURRENT_SECRET_RESOLVER.with(|cell| {
417 if let Some(resolver) = cell.borrow_mut().as_mut() {
418 let Some(values) = resolver.values.as_mut() else {
419 return;
420 };
421 match value {
422 Some(value) => {
423 values.insert(key, value);
424 }
425 None => {
426 values.remove(&key);
427 }
428 }
429 }
430 });
431}
432
433fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
434 let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
435
436 if let Some(manager) = db.store().get_collection("red_config") {
437 manager.for_each_entity(|entity| {
438 let Some(row) = entity.data.as_row() else {
439 return true;
440 };
441 let Some(Value::Text(key)) = row.get_field("key") else {
442 return true;
443 };
444 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
445 let id = entity.id.raw();
446 let key = key.to_ascii_lowercase();
447 insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
448 if let Some(rest) = key.strip_prefix("red.config.") {
449 insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
450 }
451 true
452 });
453 }
454
455 if let Some(manager) = db.store().get_collection("red.config") {
456 manager.for_each_entity(|entity| {
457 let Some(row) = entity.data.as_row() else {
458 return true;
459 };
460 if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
461 return true;
462 }
463 let Some(Value::Text(key)) = row.get_field("key") else {
464 return true;
465 };
466 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
467 insert_latest_config_value(
468 &mut latest,
469 format!("red.config/{}", key.to_ascii_lowercase()),
470 entity.id.raw(),
471 value,
472 );
473 true
474 });
475 }
476
477 latest
478 .into_iter()
479 .map(|(key, (_, value))| (key, value))
480 .collect()
481}
482
483fn insert_latest_config_value(
484 latest: &mut HashMap<String, (u64, Value)>,
485 key: String,
486 id: u64,
487 value: Value,
488) {
489 match latest.get(&key) {
490 Some((prev_id, _)) if *prev_id > id => {}
491 _ => {
492 latest.insert(key, (id, value));
493 }
494 }
495}
496
497struct ConfigResolver {
498 db: Arc<RedDB>,
499 values: Option<HashMap<String, Value>>,
500}
501
502pub(crate) struct ConfigSnapshotGuard {
503 previous: Option<ConfigResolver>,
504}
505
506impl ConfigSnapshotGuard {
507 pub(super) fn install(db: Arc<RedDB>) -> Self {
508 let previous = CURRENT_CONFIG_RESOLVER
509 .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
510 Self { previous }
511 }
512}
513
514impl Drop for ConfigSnapshotGuard {
515 fn drop(&mut self) {
516 let previous = self.previous.take();
517 CURRENT_CONFIG_RESOLVER.with(|cell| {
518 cell.replace(previous);
519 });
520 }
521}
522
523pub fn set_current_snapshot(ctx: SnapshotContext) {
528 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
529 HAS_SNAPSHOT.with(|c| c.set(true));
530}
531
532pub fn clear_current_snapshot() {
533 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
534 HAS_SNAPSHOT.with(|c| c.set(false));
535}
536
537pub(crate) struct CurrentSnapshotGuard {
543 previous: Option<SnapshotContext>,
544}
545
546impl CurrentSnapshotGuard {
547 pub(crate) fn install(ctx: SnapshotContext) -> Self {
548 let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
549 set_current_snapshot(ctx);
550 Self { previous }
551 }
552}
553
554impl Drop for CurrentSnapshotGuard {
555 fn drop(&mut self) {
556 let prev = self.previous.take();
557 let has = prev.is_some();
558 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
559 HAS_SNAPSHOT.with(|c| c.set(has));
560 }
561}
562
563#[inline]
574pub fn entity_visible_under_current_snapshot(
575 entity: &crate::storage::unified::entity::UnifiedEntity,
576) -> bool {
577 if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
584 return false;
585 }
586 if !HAS_SNAPSHOT.with(|c| c.get()) {
592 return entity.xmax == 0;
593 }
594 CURRENT_SNAPSHOT.with(|cell| {
595 let guard = cell.borrow();
596 let Some(ctx) = guard.as_ref() else {
597 return true;
598 };
599 visibility_check(ctx, entity.xmin, entity.xmax)
600 })
601}
602
603#[inline]
608pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
609 if !HAS_SNAPSHOT.with(|c| c.get()) {
610 return true;
611 }
612 CURRENT_SNAPSHOT.with(|cell| {
613 let guard = cell.borrow();
614 let Some(ctx) = guard.as_ref() else {
615 return true;
616 };
617 visibility_check(ctx, xmin, xmax)
618 })
619}
620
621pub fn capture_current_snapshot() -> Option<SnapshotContext> {
628 CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
629}
630
631pub(crate) fn current_snapshot_requires_index_fallback() -> bool {
636 if !HAS_SNAPSHOT.with(|c| c.get()) {
637 return false;
638 }
639 CURRENT_SNAPSHOT.with(|cell| {
640 cell.borrow()
641 .as_ref()
642 .is_some_and(|ctx| ctx.requires_index_fallback)
643 })
644}
645
646#[derive(Clone, Default)]
661pub struct SnapshotBundle {
662 pub snapshot: Option<SnapshotContext>,
663 pub auth: Option<(String, crate::auth::Role)>,
664 pub tenant: Option<String>,
665}
666
667pub fn snapshot_bundle() -> SnapshotBundle {
670 SnapshotBundle {
671 snapshot: capture_current_snapshot(),
672 auth: current_auth_identity(),
673 tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
674 }
675}
676
677pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
682 struct Guard {
683 prev_snapshot: Option<SnapshotContext>,
684 prev_auth: Option<(String, crate::auth::Role)>,
685 prev_tenant: Option<String>,
686 }
687 impl Drop for Guard {
688 fn drop(&mut self) {
689 let snap = self.prev_snapshot.take();
690 let has = snap.is_some();
691 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
692 HAS_SNAPSHOT.with(|c| c.set(has));
693 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
694 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
695 }
696 }
697
698 let _guard = {
699 let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
700 let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
701 let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
702
703 match bundle.snapshot.clone() {
704 Some(ctx) => set_current_snapshot(ctx),
705 None => clear_current_snapshot(),
706 }
707 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
708 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
709
710 Guard {
711 prev_snapshot,
712 prev_auth,
713 prev_tenant,
714 }
715 };
716 f()
717}
718
719#[inline]
723pub fn entity_visible_with_context(
724 ctx: Option<&SnapshotContext>,
725 entity: &crate::storage::unified::entity::UnifiedEntity,
726) -> bool {
727 if crate::runtime::ai::moderation::entity_moderation_hidden(entity) {
731 return false;
732 }
733 match ctx {
734 Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
735 None => true,
736 }
737}
738
739#[inline]
740fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
741 if xmin != 0 && ctx.manager.is_aborted(xmin) {
745 return false;
746 }
747 let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
749 0
750 } else {
751 xmax
752 };
753 let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
757 let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
758 if own_xmax {
759 return false;
761 }
762 if own_xmin {
763 return true;
764 }
765 ctx.snapshot.sees(xmin, effective_xmax)
766}