1use super::*;
2use crate::application::entity::metadata_to_json;
3use crate::auth::column_policy_gate::ColumnAccessRequest;
4use crate::auth::UserId;
5use crate::replication::cdc::ChangeRecord;
6use crate::replication::logical::{ApplyMode, LogicalChangeApplier};
7use crate::storage::query::ast::TableSource;
8
9thread_local! {
10 static CURRENT_CONN_ID: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
14
15 static CURRENT_AUTH_IDENTITY: std::cell::RefCell<Option<(String, crate::auth::Role)>> =
23 const { std::cell::RefCell::new(None) };
24
25 static CURRENT_SNAPSHOT: std::cell::RefCell<Option<SnapshotContext>> =
35 const { std::cell::RefCell::new(None) };
36
37 static HAS_SNAPSHOT: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
43
44 static CURRENT_TENANT_ID: std::cell::RefCell<Option<String>> =
54 const { std::cell::RefCell::new(None) };
55
56 static CURRENT_CONFIG_RESOLVER: std::cell::RefCell<Option<ConfigResolver>> =
60 const { std::cell::RefCell::new(None) };
61
62 static CURRENT_SECRET_RESOLVER: std::cell::RefCell<Option<SecretResolver>> =
66 const { std::cell::RefCell::new(None) };
67}
68
69fn secret_sql_value_to_string(value: &Value) -> RedDBResult<String> {
70 match value {
71 Value::Text(s) => Ok(s.to_string()),
72 Value::Integer(n) => Ok(n.to_string()),
73 Value::UnsignedInteger(n) => Ok(n.to_string()),
74 Value::Float(n) => Ok(n.to_string()),
75 Value::Boolean(b) => Ok(b.to_string()),
76 Value::Null => Err(RedDBError::Query(
77 "SET SECRET key = NULL deletes the secret; use DELETE SECRET for explicit deletes"
78 .to_string(),
79 )),
80 Value::Password(_) | Value::Secret(_) => Err(RedDBError::Query(
81 "SET SECRET accepts plain scalar literals; PASSWORD() and SECRET() are for typed columns"
82 .to_string(),
83 )),
84 _ => Err(RedDBError::Query(format!(
85 "SET SECRET does not support value type {:?} yet",
86 value.data_type()
87 ))),
88 }
89}
90
91fn system_keyed_collection_contract(
92 name: &str,
93 model: crate::catalog::CollectionModel,
94) -> crate::physical::CollectionContract {
95 let now = crate::utils::now_unix_millis() as u128;
96 crate::physical::CollectionContract {
97 name: name.to_string(),
98 declared_model: model,
99 schema_mode: crate::catalog::SchemaMode::Dynamic,
100 origin: crate::physical::ContractOrigin::Implicit,
101 version: 1,
102 created_at_unix_ms: now,
103 updated_at_unix_ms: now,
104 default_ttl_ms: None,
105 context_index_fields: Vec::new(),
106 declared_columns: Vec::new(),
107 table_def: None,
108 timestamps_enabled: false,
109 context_index_enabled: false,
110 append_only: false,
111 subscriptions: Vec::new(),
112 }
113}
114
115#[derive(Clone)]
130pub struct SnapshotContext {
131 pub snapshot: crate::storage::transaction::snapshot::Snapshot,
132 pub manager: Arc<crate::storage::transaction::snapshot::SnapshotManager>,
133 pub own_xids: std::collections::HashSet<crate::storage::transaction::snapshot::Xid>,
134}
135
136pub fn set_current_connection_id(id: u64) {
145 CURRENT_CONN_ID.with(|c| c.set(id));
146}
147
148pub fn clear_current_connection_id() {
150 CURRENT_CONN_ID.with(|c| c.set(0));
151}
152
153pub fn current_connection_id() -> u64 {
156 CURRENT_CONN_ID.with(|c| c.get())
157}
158
159pub fn set_current_auth_identity(username: String, role: crate::auth::Role) {
163 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = Some((username, role)));
164}
165
166pub fn clear_current_auth_identity() {
170 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = None);
171}
172
173pub(crate) fn current_auth_identity() -> Option<(String, crate::auth::Role)> {
176 CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone())
177}
178
179pub fn set_current_tenant(tenant_id: String) {
184 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = Some(tenant_id));
185}
186
187pub fn clear_current_tenant() {
190 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = None);
191}
192
193pub fn current_tenant() -> Option<String> {
204 let inherited = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
205 if let Some(over) = current_scope_override() {
206 if over.tenant.is_active() {
207 return over.tenant.resolve(inherited);
208 }
209 }
210 if let Some(tx_local) = current_tx_local_tenant() {
211 return tx_local;
212 }
213 inherited
214}
215
216thread_local! {
217 static TX_LOCAL_TENANT: std::cell::RefCell<Option<Option<String>>> =
226 const { std::cell::RefCell::new(None) };
227}
228
229fn current_tx_local_tenant() -> Option<Option<String>> {
230 TX_LOCAL_TENANT.with(|cell| cell.borrow().clone())
231}
232
233fn parse_set_local_tenant(query: &str) -> RedDBResult<Option<Option<String>>> {
239 let mut tokens = query.split_ascii_whitespace();
240 let Some(w1) = tokens.next() else {
241 return Ok(None);
242 };
243 if !w1.eq_ignore_ascii_case("SET") {
244 return Ok(None);
245 }
246 let Some(w2) = tokens.next() else {
247 return Ok(None);
248 };
249 if !w2.eq_ignore_ascii_case("LOCAL") {
250 return Ok(None);
251 }
252 let Some(w3) = tokens.next() else {
253 return Ok(None);
254 };
255 if !w3.eq_ignore_ascii_case("TENANT") {
256 return Ok(None);
257 }
258 let rest: String = tokens.collect::<Vec<_>>().join(" ");
259 let rest = rest.trim().trim_end_matches(';').trim();
260 let value_str = rest.strip_prefix('=').map(|s| s.trim()).unwrap_or(rest);
261 if value_str.is_empty() {
262 return Err(RedDBError::Query(
263 "SET LOCAL TENANT expects a string literal or NULL".to_string(),
264 ));
265 }
266 if value_str.eq_ignore_ascii_case("NULL") {
267 return Ok(Some(None));
268 }
269 if value_str.starts_with('\'') && value_str.ends_with('\'') && value_str.len() >= 2 {
270 let inner = &value_str[1..value_str.len() - 1];
271 return Ok(Some(Some(inner.to_string())));
272 }
273 Err(RedDBError::Query(format!(
274 "SET LOCAL TENANT expects a string literal or NULL, got `{value_str}`"
275 )))
276}
277
278pub(crate) struct TxLocalTenantGuard;
279
280impl TxLocalTenantGuard {
281 pub fn install(value: Option<Option<String>>) -> Self {
282 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = value);
283 Self
284 }
285}
286
287impl Drop for TxLocalTenantGuard {
288 fn drop(&mut self) {
289 TX_LOCAL_TENANT.with(|cell| *cell.borrow_mut() = None);
290 }
291}
292
293thread_local! {
294 static SCOPE_OVERRIDES: std::cell::RefCell<Vec<crate::runtime::within_clause::ScopeOverride>> =
301 const { std::cell::RefCell::new(Vec::new()) };
302}
303
304pub(crate) fn push_scope_override(over: crate::runtime::within_clause::ScopeOverride) {
305 SCOPE_OVERRIDES.with(|cell| cell.borrow_mut().push(over));
306}
307
308pub(crate) fn pop_scope_override() {
309 SCOPE_OVERRIDES.with(|cell| {
310 cell.borrow_mut().pop();
311 });
312}
313
314pub(crate) fn current_scope_override() -> Option<crate::runtime::within_clause::ScopeOverride> {
315 SCOPE_OVERRIDES.with(|cell| cell.borrow().last().cloned())
316}
317
318pub(crate) fn has_scope_override_active() -> bool {
322 SCOPE_OVERRIDES.with(|cell| !cell.borrow().is_empty())
323}
324
325pub(crate) struct ScopeOverrideGuard;
329
330impl ScopeOverrideGuard {
331 pub fn install(over: crate::runtime::within_clause::ScopeOverride) -> Self {
332 push_scope_override(over);
333 Self
334 }
335}
336
337impl Drop for ScopeOverrideGuard {
338 fn drop(&mut self) {
339 pop_scope_override();
340 }
341}
342
343pub(crate) fn current_user_projected() -> Option<String> {
349 let inherited = current_auth_identity().map(|(u, _)| u);
350 if let Some(over) = current_scope_override() {
351 if over.user.is_active() {
352 return over.user.resolve(inherited);
353 }
354 }
355 inherited
356}
357
358pub(crate) fn current_role_projected() -> Option<String> {
359 let inherited = current_auth_identity().map(|(_, r)| format!("{r:?}").to_lowercase());
360 if let Some(over) = current_scope_override() {
361 if over.role.is_active() {
362 return over.role.resolve(inherited);
363 }
364 }
365 inherited
366}
367
368pub(crate) fn current_secret_value(path: &str) -> Option<String> {
369 let key = path.to_ascii_lowercase();
370 CURRENT_SECRET_RESOLVER.with(|cell| {
371 let mut resolver = cell.borrow_mut();
372 let resolver = resolver.as_mut()?;
373 if resolver.values.is_none() {
374 resolver.values = resolver
375 .store
376 .as_ref()
377 .map(|store| store.vault_kv_snapshot());
378 }
379 let values = resolver.values.as_ref()?;
380 values.get(&key).cloned().or_else(|| {
381 key.strip_prefix("red.vault/").and_then(|rest| {
382 values
383 .get(rest)
384 .cloned()
385 .or_else(|| values.get(&format!("red.secret.{rest}")).cloned())
386 })
387 })
388 })
389}
390
391struct SecretResolver {
392 store: Option<Arc<crate::auth::store::AuthStore>>,
393 values: Option<HashMap<String, String>>,
394}
395
396pub(super) struct SecretStoreGuard {
397 previous: Option<SecretResolver>,
398}
399
400impl SecretStoreGuard {
401 pub(super) fn install(store: Option<Arc<crate::auth::store::AuthStore>>) -> Self {
402 let previous = CURRENT_SECRET_RESOLVER.with(|cell| {
403 cell.replace(Some(SecretResolver {
404 store,
405 values: None,
406 }))
407 });
408 Self { previous }
409 }
410}
411
412impl Drop for SecretStoreGuard {
413 fn drop(&mut self) {
414 let previous = self.previous.take();
415 CURRENT_SECRET_RESOLVER.with(|cell| {
416 cell.replace(previous);
417 });
418 }
419}
420
421pub(crate) fn current_config_value(path: &str) -> Option<Value> {
422 let key = path.to_ascii_lowercase();
423 CURRENT_CONFIG_RESOLVER.with(|cell| {
424 let mut resolver = cell.borrow_mut();
425 let resolver = resolver.as_mut()?;
426 if resolver.values.is_none() {
427 resolver.values = Some(latest_config_snapshot(&resolver.db));
428 }
429 let values = resolver.values.as_ref()?;
430 values.get(&key).cloned().or_else(|| {
431 key.strip_prefix("red.config/")
432 .and_then(|rest| values.get(&format!("red.config.{rest}")).cloned())
433 })
434 })
435}
436
437fn update_current_config_value(path: &str, value: Value) {
438 let key = path.to_ascii_lowercase();
439 CURRENT_CONFIG_RESOLVER.with(|cell| {
440 if let Some(resolver) = cell.borrow_mut().as_mut() {
441 if let Some(values) = resolver.values.as_mut() {
442 values.insert(key, value);
443 }
444 }
445 });
446}
447
448fn update_current_secret_value(path: &str, value: Option<String>) {
449 let key = path.to_ascii_lowercase();
450 CURRENT_SECRET_RESOLVER.with(|cell| {
451 if let Some(resolver) = cell.borrow_mut().as_mut() {
452 let Some(values) = resolver.values.as_mut() else {
453 return;
454 };
455 match value {
456 Some(value) => {
457 values.insert(key, value);
458 }
459 None => {
460 values.remove(&key);
461 }
462 }
463 }
464 });
465}
466
467fn latest_config_snapshot(db: &RedDB) -> HashMap<String, Value> {
468 let mut latest: HashMap<String, (u64, Value)> = HashMap::new();
469
470 if let Some(manager) = db.store().get_collection("red_config") {
471 manager.for_each_entity(|entity| {
472 let Some(row) = entity.data.as_row() else {
473 return true;
474 };
475 let Some(Value::Text(key)) = row.get_field("key") else {
476 return true;
477 };
478 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
479 let id = entity.id.raw();
480 let key = key.to_ascii_lowercase();
481 insert_latest_config_value(&mut latest, key.clone(), id, value.clone());
482 if let Some(rest) = key.strip_prefix("red.config.") {
483 insert_latest_config_value(&mut latest, format!("red.config/{rest}"), id, value);
484 }
485 true
486 });
487 }
488
489 if let Some(manager) = db.store().get_collection("red.config") {
490 manager.for_each_entity(|entity| {
491 let Some(row) = entity.data.as_row() else {
492 return true;
493 };
494 if matches!(row.get_field("tombstone"), Some(Value::Boolean(true))) {
495 return true;
496 }
497 let Some(Value::Text(key)) = row.get_field("key") else {
498 return true;
499 };
500 let value = row.get_field("value").cloned().unwrap_or(Value::Null);
501 insert_latest_config_value(
502 &mut latest,
503 format!("red.config/{}", key.to_ascii_lowercase()),
504 entity.id.raw(),
505 value,
506 );
507 true
508 });
509 }
510
511 latest
512 .into_iter()
513 .map(|(key, (_, value))| (key, value))
514 .collect()
515}
516
517fn insert_latest_config_value(
518 latest: &mut HashMap<String, (u64, Value)>,
519 key: String,
520 id: u64,
521 value: Value,
522) {
523 match latest.get(&key) {
524 Some((prev_id, _)) if *prev_id > id => {}
525 _ => {
526 latest.insert(key, (id, value));
527 }
528 }
529}
530
531struct ConfigResolver {
532 db: Arc<RedDB>,
533 values: Option<HashMap<String, Value>>,
534}
535
536pub(super) struct ConfigSnapshotGuard {
537 previous: Option<ConfigResolver>,
538}
539
540impl ConfigSnapshotGuard {
541 pub(super) fn install(db: Arc<RedDB>) -> Self {
542 let previous = CURRENT_CONFIG_RESOLVER
543 .with(|cell| cell.replace(Some(ConfigResolver { db, values: None })));
544 Self { previous }
545 }
546}
547
548impl Drop for ConfigSnapshotGuard {
549 fn drop(&mut self) {
550 let previous = self.previous.take();
551 CURRENT_CONFIG_RESOLVER.with(|cell| {
552 cell.replace(previous);
553 });
554 }
555}
556
557pub fn set_current_snapshot(ctx: SnapshotContext) {
562 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = Some(ctx));
563 HAS_SNAPSHOT.with(|c| c.set(true));
564}
565
566pub fn clear_current_snapshot() {
567 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = None);
568 HAS_SNAPSHOT.with(|c| c.set(false));
569}
570
571pub(crate) struct CurrentSnapshotGuard {
577 previous: Option<SnapshotContext>,
578}
579
580impl CurrentSnapshotGuard {
581 pub(crate) fn install(ctx: SnapshotContext) -> Self {
582 let previous = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
583 set_current_snapshot(ctx);
584 Self { previous }
585 }
586}
587
588impl Drop for CurrentSnapshotGuard {
589 fn drop(&mut self) {
590 let prev = self.previous.take();
591 let has = prev.is_some();
592 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = prev);
593 HAS_SNAPSHOT.with(|c| c.set(has));
594 }
595}
596
597#[inline]
608pub fn entity_visible_under_current_snapshot(
609 entity: &crate::storage::unified::entity::UnifiedEntity,
610) -> bool {
611 if !HAS_SNAPSHOT.with(|c| c.get()) {
617 return true;
618 }
619 CURRENT_SNAPSHOT.with(|cell| {
620 let guard = cell.borrow();
621 let Some(ctx) = guard.as_ref() else {
622 return true;
623 };
624 visibility_check(ctx, entity.xmin, entity.xmax)
625 })
626}
627
628#[inline]
633pub(crate) fn xids_visible_under_current_snapshot(xmin: u64, xmax: u64) -> bool {
634 if !HAS_SNAPSHOT.with(|c| c.get()) {
635 return true;
636 }
637 CURRENT_SNAPSHOT.with(|cell| {
638 let guard = cell.borrow();
639 let Some(ctx) = guard.as_ref() else {
640 return true;
641 };
642 visibility_check(ctx, xmin, xmax)
643 })
644}
645
646pub fn capture_current_snapshot() -> Option<SnapshotContext> {
653 CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone())
654}
655
656#[derive(Clone, Default)]
671pub struct SnapshotBundle {
672 pub snapshot: Option<SnapshotContext>,
673 pub auth: Option<(String, crate::auth::Role)>,
674 pub tenant: Option<String>,
675}
676
677pub fn snapshot_bundle() -> SnapshotBundle {
680 SnapshotBundle {
681 snapshot: capture_current_snapshot(),
682 auth: current_auth_identity(),
683 tenant: CURRENT_TENANT_ID.with(|cell| cell.borrow().clone()),
684 }
685}
686
687pub fn with_snapshot_bundle<R>(bundle: &SnapshotBundle, f: impl FnOnce() -> R) -> R {
692 struct Guard {
693 prev_snapshot: Option<SnapshotContext>,
694 prev_auth: Option<(String, crate::auth::Role)>,
695 prev_tenant: Option<String>,
696 }
697 impl Drop for Guard {
698 fn drop(&mut self) {
699 let snap = self.prev_snapshot.take();
700 let has = snap.is_some();
701 CURRENT_SNAPSHOT.with(|cell| *cell.borrow_mut() = snap);
702 HAS_SNAPSHOT.with(|c| c.set(has));
703 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = self.prev_auth.take());
704 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = self.prev_tenant.take());
705 }
706 }
707
708 let _guard = {
709 let prev_snapshot = CURRENT_SNAPSHOT.with(|cell| cell.borrow().clone());
710 let prev_auth = CURRENT_AUTH_IDENTITY.with(|cell| cell.borrow().clone());
711 let prev_tenant = CURRENT_TENANT_ID.with(|cell| cell.borrow().clone());
712
713 match bundle.snapshot.clone() {
714 Some(ctx) => set_current_snapshot(ctx),
715 None => clear_current_snapshot(),
716 }
717 CURRENT_AUTH_IDENTITY.with(|cell| *cell.borrow_mut() = bundle.auth.clone());
718 CURRENT_TENANT_ID.with(|cell| *cell.borrow_mut() = bundle.tenant.clone());
719
720 Guard {
721 prev_snapshot,
722 prev_auth,
723 prev_tenant,
724 }
725 };
726 f()
727}
728
729#[inline]
733pub fn entity_visible_with_context(
734 ctx: Option<&SnapshotContext>,
735 entity: &crate::storage::unified::entity::UnifiedEntity,
736) -> bool {
737 match ctx {
738 Some(ctx) => visibility_check(ctx, entity.xmin, entity.xmax),
739 None => true,
740 }
741}
742
743#[inline]
744fn visibility_check(ctx: &SnapshotContext, xmin: u64, xmax: u64) -> bool {
745 if xmin != 0 && ctx.manager.is_aborted(xmin) {
749 return false;
750 }
751 let effective_xmax = if xmax != 0 && ctx.manager.is_aborted(xmax) {
753 0
754 } else {
755 xmax
756 };
757 let own_xmin = xmin != 0 && ctx.own_xids.contains(&xmin);
761 let own_xmax = effective_xmax != 0 && ctx.own_xids.contains(&effective_xmax);
762 if own_xmax {
763 return false;
765 }
766 if own_xmin {
767 return true;
768 }
769 ctx.snapshot.sees(xmin, effective_xmax)
770}
771
772fn runtime_pool_lock(runtime: &RedDBRuntime) -> std::sync::MutexGuard<'_, PoolState> {
773 runtime
774 .inner
775 .pool
776 .lock()
777 .unwrap_or_else(|poisoned| poisoned.into_inner())
778}
779
780fn cache_scope_insert(scopes: &mut HashSet<String>, name: &str) {
781 if name.is_empty() || name.starts_with("__subq_") || is_universal_query_source(name) {
782 return;
783 }
784 scopes.insert(name.to_string());
785}
786
787fn collect_table_source_scopes(scopes: &mut HashSet<String>, query: &TableQuery) {
788 match query.source.as_ref() {
789 Some(crate::storage::query::ast::TableSource::Name(name)) => {
790 cache_scope_insert(scopes, name)
791 }
792 Some(crate::storage::query::ast::TableSource::Subquery(subquery)) => {
793 collect_query_expr_result_cache_scopes(scopes, subquery);
794 }
795 None => cache_scope_insert(scopes, &query.table),
796 }
797}
798
799fn collect_vector_source_scopes(
800 scopes: &mut HashSet<String>,
801 source: &crate::storage::query::ast::VectorSource,
802) {
803 match source {
804 crate::storage::query::ast::VectorSource::Reference { collection, .. } => {
805 cache_scope_insert(scopes, collection);
806 }
807 crate::storage::query::ast::VectorSource::Subquery(subquery) => {
808 collect_query_expr_result_cache_scopes(scopes, subquery);
809 }
810 crate::storage::query::ast::VectorSource::Literal(_)
811 | crate::storage::query::ast::VectorSource::Text(_) => {}
812 }
813}
814
815fn collect_path_selector_scopes(
816 scopes: &mut HashSet<String>,
817 selector: &crate::storage::query::ast::NodeSelector,
818) {
819 if let crate::storage::query::ast::NodeSelector::ByRow { table, .. } = selector {
820 cache_scope_insert(scopes, table);
821 }
822}
823
824fn collect_query_expr_result_cache_scopes(scopes: &mut HashSet<String>, expr: &QueryExpr) {
825 match expr {
826 QueryExpr::Table(query) => collect_table_source_scopes(scopes, query),
827 QueryExpr::Join(query) => {
828 collect_query_expr_result_cache_scopes(scopes, &query.left);
829 collect_query_expr_result_cache_scopes(scopes, &query.right);
830 }
831 QueryExpr::Path(query) => {
832 collect_path_selector_scopes(scopes, &query.from);
833 collect_path_selector_scopes(scopes, &query.to);
834 }
835 QueryExpr::Vector(query) => {
836 cache_scope_insert(scopes, &query.collection);
837 collect_vector_source_scopes(scopes, &query.query_vector);
838 }
839 QueryExpr::Hybrid(query) => {
840 collect_query_expr_result_cache_scopes(scopes, &query.structured);
841 cache_scope_insert(scopes, &query.vector.collection);
842 collect_vector_source_scopes(scopes, &query.vector.query_vector);
843 }
844 QueryExpr::Insert(query) => cache_scope_insert(scopes, &query.table),
845 QueryExpr::Update(query) => cache_scope_insert(scopes, &query.table),
846 QueryExpr::Delete(query) => cache_scope_insert(scopes, &query.table),
847 QueryExpr::CreateTable(query) => cache_scope_insert(scopes, &query.name),
848 QueryExpr::DropTable(query) => cache_scope_insert(scopes, &query.name),
849 QueryExpr::DropGraph(query) => cache_scope_insert(scopes, &query.name),
850 QueryExpr::DropVector(query) => cache_scope_insert(scopes, &query.name),
851 QueryExpr::DropDocument(query) => cache_scope_insert(scopes, &query.name),
852 QueryExpr::DropKv(query) => cache_scope_insert(scopes, &query.name),
853 QueryExpr::DropCollection(query) => cache_scope_insert(scopes, &query.name),
854 QueryExpr::Truncate(query) => cache_scope_insert(scopes, &query.name),
855 QueryExpr::AlterTable(query) => cache_scope_insert(scopes, &query.name),
856 QueryExpr::CreateIndex(query) => cache_scope_insert(scopes, &query.table),
857 QueryExpr::DropIndex(query) => cache_scope_insert(scopes, &query.table),
858 QueryExpr::CreateTimeSeries(query) => cache_scope_insert(scopes, &query.name),
859 QueryExpr::DropTimeSeries(query) => cache_scope_insert(scopes, &query.name),
860 QueryExpr::CreateQueue(query) => cache_scope_insert(scopes, &query.name),
861 QueryExpr::AlterQueue(query) => cache_scope_insert(scopes, &query.name),
862 QueryExpr::DropQueue(query) => cache_scope_insert(scopes, &query.name),
863 QueryExpr::QueueSelect(query) => cache_scope_insert(scopes, &query.queue),
864 QueryExpr::QueueCommand(query) => match query {
865 QueueCommand::Push { queue, .. }
866 | QueueCommand::Pop { queue, .. }
867 | QueueCommand::Peek { queue, .. }
868 | QueueCommand::Len { queue }
869 | QueueCommand::Purge { queue }
870 | QueueCommand::GroupCreate { queue, .. }
871 | QueueCommand::GroupRead { queue, .. }
872 | QueueCommand::Pending { queue, .. }
873 | QueueCommand::Claim { queue, .. }
874 | QueueCommand::Ack { queue, .. }
875 | QueueCommand::Nack { queue, .. } => cache_scope_insert(scopes, queue),
876 QueueCommand::Move {
877 source,
878 destination,
879 ..
880 } => {
881 cache_scope_insert(scopes, source);
882 cache_scope_insert(scopes, destination);
883 }
884 },
885 QueryExpr::EventsBackfill(query) => {
886 cache_scope_insert(scopes, &query.collection);
887 cache_scope_insert(scopes, &query.target_queue);
888 }
889 QueryExpr::CreateTree(query) => cache_scope_insert(scopes, &query.collection),
890 QueryExpr::DropTree(query) => cache_scope_insert(scopes, &query.collection),
891 QueryExpr::TreeCommand(query) => match query {
892 TreeCommand::Insert { collection, .. }
893 | TreeCommand::Move { collection, .. }
894 | TreeCommand::Delete { collection, .. }
895 | TreeCommand::Validate { collection, .. }
896 | TreeCommand::Rebalance { collection, .. } => cache_scope_insert(scopes, collection),
897 },
898 QueryExpr::SearchCommand(query) => match query {
899 SearchCommand::Similar { collection, .. }
900 | SearchCommand::Hybrid { collection, .. }
901 | SearchCommand::SpatialRadius { collection, .. }
902 | SearchCommand::SpatialBbox { collection, .. }
903 | SearchCommand::SpatialNearest { collection, .. } => {
904 cache_scope_insert(scopes, collection);
905 }
906 SearchCommand::Text { collection, .. }
907 | SearchCommand::Multimodal { collection, .. }
908 | SearchCommand::Index { collection, .. }
909 | SearchCommand::Context { collection, .. } => {
910 if let Some(collection) = collection.as_deref() {
911 cache_scope_insert(scopes, collection);
912 }
913 }
914 },
915 QueryExpr::Ask(query) => {
916 if let Some(collection) = query.collection.as_deref() {
917 cache_scope_insert(scopes, collection);
918 }
919 }
920 QueryExpr::ExplainAlter(query) => cache_scope_insert(scopes, &query.target.name),
921 QueryExpr::MaintenanceCommand(cmd) => match cmd {
922 crate::storage::query::ast::MaintenanceCommand::Vacuum { target, .. }
923 | crate::storage::query::ast::MaintenanceCommand::Analyze { target } => {
924 if let Some(t) = target {
925 cache_scope_insert(scopes, t);
926 }
927 }
928 },
929 QueryExpr::CopyFrom(cmd) => cache_scope_insert(scopes, &cmd.table),
930 QueryExpr::CreateView(cmd) => {
931 cache_scope_insert(scopes, &cmd.name);
932 collect_query_expr_result_cache_scopes(scopes, &cmd.query);
934 }
935 QueryExpr::DropView(cmd) => cache_scope_insert(scopes, &cmd.name),
936 QueryExpr::RefreshMaterializedView(cmd) => cache_scope_insert(scopes, &cmd.name),
937 QueryExpr::CreatePolicy(cmd) => cache_scope_insert(scopes, &cmd.table),
938 QueryExpr::DropPolicy(cmd) => cache_scope_insert(scopes, &cmd.table),
939 QueryExpr::CreateServer(_) | QueryExpr::DropServer(_) => {}
940 QueryExpr::CreateForeignTable(cmd) => cache_scope_insert(scopes, &cmd.name),
941 QueryExpr::DropForeignTable(cmd) => cache_scope_insert(scopes, &cmd.name),
942 QueryExpr::Graph(_)
943 | QueryExpr::GraphCommand(_)
944 | QueryExpr::ProbabilisticCommand(_)
945 | QueryExpr::SetConfig { .. }
946 | QueryExpr::ShowConfig { .. }
947 | QueryExpr::SetSecret { .. }
948 | QueryExpr::DeleteSecret { .. }
949 | QueryExpr::ShowSecrets { .. }
950 | QueryExpr::SetTenant(_)
951 | QueryExpr::ShowTenant
952 | QueryExpr::TransactionControl(_)
953 | QueryExpr::CreateSchema(_)
954 | QueryExpr::DropSchema(_)
955 | QueryExpr::CreateSequence(_)
956 | QueryExpr::DropSequence(_)
957 | QueryExpr::Grant(_)
958 | QueryExpr::Revoke(_)
959 | QueryExpr::AlterUser(_)
960 | QueryExpr::CreateIamPolicy { .. }
961 | QueryExpr::DropIamPolicy { .. }
962 | QueryExpr::AttachPolicy { .. }
963 | QueryExpr::DetachPolicy { .. }
964 | QueryExpr::ShowPolicies { .. }
965 | QueryExpr::ShowEffectivePermissions { .. }
966 | QueryExpr::SimulatePolicy { .. }
967 | QueryExpr::CreateMigration(_)
968 | QueryExpr::ApplyMigration(_)
969 | QueryExpr::RollbackMigration(_)
970 | QueryExpr::ExplainMigration(_)
971 | QueryExpr::EventsBackfillStatus { .. } => {}
972 QueryExpr::KvCommand(cmd) => {
973 use crate::storage::query::ast::KvCommand;
974 match cmd {
975 KvCommand::Put { collection, .. }
976 | KvCommand::InvalidateTags { collection, .. }
977 | KvCommand::Get { collection, .. }
978 | KvCommand::Unseal { collection, .. }
979 | KvCommand::Rotate { collection, .. }
980 | KvCommand::History { collection, .. }
981 | KvCommand::List { collection, .. }
982 | KvCommand::Purge { collection, .. }
983 | KvCommand::Watch { collection, .. }
984 | KvCommand::Delete { collection, .. }
985 | KvCommand::Incr { collection, .. }
986 | KvCommand::Cas { collection, .. } => cache_scope_insert(scopes, collection),
987 }
988 }
989 QueryExpr::ConfigCommand(cmd) => {
990 use crate::storage::query::ast::ConfigCommand;
991 match cmd {
992 ConfigCommand::Put { collection, .. }
993 | ConfigCommand::Get { collection, .. }
994 | ConfigCommand::Resolve { collection, .. }
995 | ConfigCommand::Rotate { collection, .. }
996 | ConfigCommand::Delete { collection, .. }
997 | ConfigCommand::History { collection, .. }
998 | ConfigCommand::List { collection, .. }
999 | ConfigCommand::Watch { collection, .. }
1000 | ConfigCommand::InvalidVolatileOperation { collection, .. } => {
1001 cache_scope_insert(scopes, collection)
1002 }
1003 }
1004 }
1005 }
1006}
1007
1008pub(crate) fn rls_policy_filter(
1016 runtime: &RedDBRuntime,
1017 table: &str,
1018 action: crate::storage::query::ast::PolicyAction,
1019) -> Option<crate::storage::query::ast::Filter> {
1020 rls_policy_filter_for_kind(
1021 runtime,
1022 table,
1023 action,
1024 crate::storage::query::ast::PolicyTargetKind::Table,
1025 )
1026}
1027
1028pub(crate) fn rls_policy_filter_for_kind(
1034 runtime: &RedDBRuntime,
1035 table: &str,
1036 action: crate::storage::query::ast::PolicyAction,
1037 kind: crate::storage::query::ast::PolicyTargetKind,
1038) -> Option<crate::storage::query::ast::Filter> {
1039 use crate::storage::query::ast::Filter;
1040
1041 if !runtime.inner.rls_enabled_tables.read().contains(table) {
1042 return None;
1043 }
1044 let role = current_auth_identity().map(|(_, role)| role);
1045 let role_str = role.map(|r| r.as_str().to_string());
1046 let policies = runtime.matching_rls_policies_for_kind(table, role_str.as_deref(), action, kind);
1047 if policies.is_empty() {
1048 return None;
1049 }
1050 policies
1051 .into_iter()
1052 .reduce(|acc, f| Filter::Or(Box::new(acc), Box::new(f)))
1053}
1054
1055pub(crate) fn rls_is_enabled(runtime: &RedDBRuntime, table: &str) -> bool {
1059 runtime.inner.rls_enabled_tables.read().contains(table)
1060}
1061
1062fn node_passes_rls(
1069 runtime: &RedDBRuntime,
1070 collection: &str,
1071 role: Option<&str>,
1072 cache: &mut std::collections::HashMap<String, Option<crate::storage::query::ast::Filter>>,
1073 entity: &crate::storage::unified::entity::UnifiedEntity,
1074) -> bool {
1075 use crate::storage::query::ast::{Filter, PolicyAction, PolicyTargetKind};
1076
1077 if !runtime.inner.rls_enabled_tables.read().contains(collection) {
1078 return true;
1079 }
1080 let filter = cache.entry(collection.to_string()).or_insert_with(|| {
1081 let policies = runtime.matching_rls_policies_for_kind(
1082 collection,
1083 role,
1084 PolicyAction::Select,
1085 PolicyTargetKind::Nodes,
1086 );
1087 if policies.is_empty() {
1088 None
1089 } else {
1090 policies
1091 .into_iter()
1092 .reduce(|acc, f| Filter::Or(Box::new(acc), Box::new(f)))
1093 }
1094 });
1095 let Some(filter) = filter else {
1096 return false;
1097 };
1098 crate::runtime::query_exec::evaluate_entity_filter_with_db(
1099 Some(&runtime.inner.db),
1100 entity,
1101 filter,
1102 collection,
1103 collection,
1104 )
1105}
1106
1107fn edge_passes_rls(
1110 runtime: &RedDBRuntime,
1111 collection: &str,
1112 role: Option<&str>,
1113 cache: &mut std::collections::HashMap<String, Option<crate::storage::query::ast::Filter>>,
1114 entity: &crate::storage::unified::entity::UnifiedEntity,
1115) -> bool {
1116 use crate::storage::query::ast::{Filter, PolicyAction, PolicyTargetKind};
1117
1118 if !runtime.inner.rls_enabled_tables.read().contains(collection) {
1119 return true;
1120 }
1121 let filter = cache.entry(collection.to_string()).or_insert_with(|| {
1122 let policies = runtime.matching_rls_policies_for_kind(
1123 collection,
1124 role,
1125 PolicyAction::Select,
1126 PolicyTargetKind::Edges,
1127 );
1128 if policies.is_empty() {
1129 None
1130 } else {
1131 policies
1132 .into_iter()
1133 .reduce(|acc, f| Filter::Or(Box::new(acc), Box::new(f)))
1134 }
1135 });
1136 let Some(filter) = filter else {
1137 return false;
1138 };
1139 crate::runtime::query_exec::evaluate_entity_filter_with_db(
1140 Some(&runtime.inner.db),
1141 entity,
1142 filter,
1143 collection,
1144 collection,
1145 )
1146}
1147
1148fn inject_rls_filters(
1169 runtime: &RedDBRuntime,
1170 frame: &dyn super::statement_frame::ReadFrame,
1171 mut table: crate::storage::query::ast::TableQuery,
1172) -> Option<crate::storage::query::ast::TableQuery> {
1173 use crate::storage::query::ast::{Filter, PolicyAction};
1174
1175 let role = frame.identity().map(|(_, role)| role);
1177 let role_str = role.map(|r| r.as_str().to_string());
1178 let policies =
1179 runtime.matching_rls_policies(&table.table, role_str.as_deref(), PolicyAction::Select);
1180
1181 if policies.is_empty() {
1182 return None;
1185 }
1186
1187 let combined = policies
1189 .into_iter()
1190 .reduce(|acc, f| Filter::Or(Box::new(acc), Box::new(f)))
1191 .expect("policies non-empty");
1192
1193 table.filter = Some(match table.filter.take() {
1195 Some(existing) => Filter::And(Box::new(existing), Box::new(combined)),
1196 None => combined,
1197 });
1198 Some(table)
1199}
1200
1201fn inject_rls_into_join(
1211 runtime: &RedDBRuntime,
1212 frame: &dyn super::statement_frame::ReadFrame,
1213 mut join: crate::storage::query::ast::JoinQuery,
1214) -> Option<crate::storage::query::ast::JoinQuery> {
1215 use crate::storage::query::ast::Filter;
1216
1217 let mut policy_filters: Vec<Filter> = Vec::new();
1218 if !collect_join_side_policy(runtime, frame, join.left.as_ref(), &mut policy_filters) {
1219 return None;
1220 }
1221 if !collect_join_side_policy(runtime, frame, join.right.as_ref(), &mut policy_filters) {
1222 return None;
1223 }
1224
1225 if policy_filters.is_empty() {
1226 return Some(join);
1227 }
1228
1229 let combined = policy_filters
1230 .into_iter()
1231 .reduce(|acc, f| Filter::And(Box::new(acc), Box::new(f)))
1232 .expect("policy_filters non-empty");
1233
1234 join.filter = Some(match join.filter.take() {
1235 Some(existing) => Filter::And(Box::new(existing), Box::new(combined)),
1236 None => combined,
1237 });
1238
1239 Some(join)
1240}
1241
1242fn collect_join_side_policy(
1247 runtime: &RedDBRuntime,
1248 frame: &dyn super::statement_frame::ReadFrame,
1249 expr: &crate::storage::query::ast::QueryExpr,
1250 out: &mut Vec<crate::storage::query::ast::Filter>,
1251) -> bool {
1252 use crate::storage::query::ast::{Filter, PolicyAction, QueryExpr};
1253 match expr {
1254 QueryExpr::Table(t) => {
1255 if !runtime.inner.rls_enabled_tables.read().contains(&t.table) {
1256 return true;
1257 }
1258 let role = frame.identity().map(|(_, role)| role);
1259 let role_str = role.map(|r| r.as_str().to_string());
1260 let policies =
1261 runtime.matching_rls_policies(&t.table, role_str.as_deref(), PolicyAction::Select);
1262 if policies.is_empty() {
1263 return false;
1264 }
1265 let combined = policies
1266 .into_iter()
1267 .reduce(|acc, f| Filter::Or(Box::new(acc), Box::new(f)))
1268 .expect("policies non-empty");
1269 out.push(combined);
1270 true
1271 }
1272 QueryExpr::Join(inner) => {
1273 collect_join_side_policy(runtime, frame, inner.left.as_ref(), out)
1274 && collect_join_side_policy(runtime, frame, inner.right.as_ref(), out)
1275 }
1276 _ => true,
1277 }
1278}
1279
1280fn apply_foreign_table_filters(
1291 records: Vec<crate::storage::query::unified::UnifiedRecord>,
1292 query: &crate::storage::query::ast::TableQuery,
1293) -> crate::storage::query::unified::UnifiedResult {
1294 use crate::storage::query::sql_lowering::{
1295 effective_table_filter, effective_table_projections,
1296 };
1297 use crate::storage::query::unified::UnifiedResult;
1298
1299 let filter = effective_table_filter(query);
1300 let projections = effective_table_projections(query);
1301
1302 let mut filtered: Vec<_> = records
1305 .into_iter()
1306 .filter(|record| match &filter {
1307 Some(f) => {
1308 super::join_filter::evaluate_runtime_filter_with_db(None, record, f, None, None)
1309 }
1310 None => true,
1311 })
1312 .collect();
1313
1314 if let Some(offset) = query.offset {
1316 let offset = offset as usize;
1317 if offset >= filtered.len() {
1318 filtered.clear();
1319 } else {
1320 filtered.drain(0..offset);
1321 }
1322 }
1323 if let Some(limit) = query.limit {
1324 filtered.truncate(limit as usize);
1325 }
1326
1327 let columns: Vec<String> = if projections.is_empty() {
1330 filtered
1331 .first()
1332 .map(|r| r.column_names().iter().map(|k| k.to_string()).collect())
1333 .unwrap_or_default()
1334 } else {
1335 projections
1336 .iter()
1337 .map(super::join_filter::projection_name)
1338 .collect()
1339 };
1340
1341 let mut result = UnifiedResult::empty();
1342 result.columns = columns;
1343 result.records = filtered;
1344 result
1345}
1346
1347pub(crate) fn collect_table_refs(expr: &QueryExpr) -> Vec<String> {
1354 let mut scopes: HashSet<String> = HashSet::new();
1355 collect_query_expr_result_cache_scopes(&mut scopes, expr);
1356 scopes.into_iter().collect()
1357}
1358
1359fn query_expr_result_cache_scopes(expr: &QueryExpr) -> HashSet<String> {
1360 let mut scopes = HashSet::new();
1361 collect_query_expr_result_cache_scopes(&mut scopes, expr);
1362 scopes
1363}
1364
1365const RESULT_CACHE_BACKEND_KEY: &str = "runtime.result_cache.backend";
1366const RESULT_CACHE_DEFAULT_BACKEND: &str = "legacy";
1367const RESULT_CACHE_BLOB_NAMESPACE: &str = "runtime.result_cache";
1368const RESULT_CACHE_TTL_SECS: u64 = 30;
1369const RESULT_CACHE_MAX_ENTRIES: usize = 1000;
1370const RESULT_CACHE_PAYLOAD_MAGIC: &[u8; 8] = b"RDRC0001";
1371
1372#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1373enum RuntimeResultCacheBackend {
1374 Legacy,
1375 BlobCache,
1376 Shadow,
1377}
1378
1379fn trim_result_cache(
1380 map: &mut HashMap<String, RuntimeResultCacheEntry>,
1381 order: &mut std::collections::VecDeque<String>,
1382) {
1383 while map.len() > RESULT_CACHE_MAX_ENTRIES {
1384 if let Some(oldest) = order.pop_front() {
1385 map.remove(&oldest);
1386 } else {
1387 break;
1388 }
1389 }
1390}
1391
1392fn result_cache_fingerprint(result: &RuntimeQueryResult) -> String {
1393 format!(
1394 "{:?}|{}|{}|{}|{}|{:?}",
1395 result.result,
1396 result.query,
1397 result.statement,
1398 result.engine,
1399 result.affected_rows,
1400 result.statement_type
1401 )
1402}
1403
1404fn mode_to_byte(mode: crate::storage::query::modes::QueryMode) -> u8 {
1405 match mode {
1406 crate::storage::query::modes::QueryMode::Sql => 0,
1407 crate::storage::query::modes::QueryMode::Gremlin => 1,
1408 crate::storage::query::modes::QueryMode::Cypher => 2,
1409 crate::storage::query::modes::QueryMode::Sparql => 3,
1410 crate::storage::query::modes::QueryMode::Path => 4,
1411 crate::storage::query::modes::QueryMode::Natural => 5,
1412 crate::storage::query::modes::QueryMode::Unknown => 255,
1413 }
1414}
1415
1416fn mode_from_byte(byte: u8) -> Option<crate::storage::query::modes::QueryMode> {
1417 match byte {
1418 0 => Some(crate::storage::query::modes::QueryMode::Sql),
1419 1 => Some(crate::storage::query::modes::QueryMode::Gremlin),
1420 2 => Some(crate::storage::query::modes::QueryMode::Cypher),
1421 3 => Some(crate::storage::query::modes::QueryMode::Sparql),
1422 4 => Some(crate::storage::query::modes::QueryMode::Path),
1423 5 => Some(crate::storage::query::modes::QueryMode::Natural),
1424 255 => Some(crate::storage::query::modes::QueryMode::Unknown),
1425 _ => None,
1426 }
1427}
1428
1429fn result_cache_static_str(value: &str) -> Option<&'static str> {
1430 match value {
1431 "select" => Some("select"),
1432 "materialized-graph" => Some("materialized-graph"),
1433 "runtime-red-schema" => Some("runtime-red-schema"),
1434 "runtime-fdw" => Some("runtime-fdw"),
1435 "runtime-table-rls" => Some("runtime-table-rls"),
1436 "runtime-table" => Some("runtime-table"),
1437 "runtime-join-rls" => Some("runtime-join-rls"),
1438 "runtime-join" => Some("runtime-join"),
1439 "runtime-vector" => Some("runtime-vector"),
1440 "runtime-hybrid" => Some("runtime-hybrid"),
1441 "runtime-secret" => Some("runtime-secret"),
1442 "runtime-config" => Some("runtime-config"),
1443 "runtime-tenant" => Some("runtime-tenant"),
1444 "runtime-explain" => Some("runtime-explain"),
1445 "runtime-tree" => Some("runtime-tree"),
1446 "runtime-kv" => Some("runtime-kv"),
1447 "runtime-queue" => Some("runtime-queue"),
1448 _ => None,
1449 }
1450}
1451
1452fn write_u32(out: &mut Vec<u8>, value: usize) -> Option<()> {
1453 let value = u32::try_from(value).ok()?;
1454 out.extend_from_slice(&value.to_le_bytes());
1455 Some(())
1456}
1457
1458fn write_string(out: &mut Vec<u8>, value: &str) -> Option<()> {
1459 write_u32(out, value.len())?;
1460 out.extend_from_slice(value.as_bytes());
1461 Some(())
1462}
1463
1464fn write_bytes(out: &mut Vec<u8>, value: &[u8]) -> Option<()> {
1465 write_u32(out, value.len())?;
1466 out.extend_from_slice(value);
1467 Some(())
1468}
1469
1470fn read_u8(input: &mut &[u8]) -> Option<u8> {
1471 let (&value, rest) = input.split_first()?;
1472 *input = rest;
1473 Some(value)
1474}
1475
1476fn read_u32(input: &mut &[u8]) -> Option<usize> {
1477 if input.len() < 4 {
1478 return None;
1479 }
1480 let value = u32::from_le_bytes(input[..4].try_into().ok()?) as usize;
1481 *input = &input[4..];
1482 Some(value)
1483}
1484
1485fn read_u64(input: &mut &[u8]) -> Option<u64> {
1486 if input.len() < 8 {
1487 return None;
1488 }
1489 let value = u64::from_le_bytes(input[..8].try_into().ok()?);
1490 *input = &input[8..];
1491 Some(value)
1492}
1493
1494fn read_string(input: &mut &[u8]) -> Option<String> {
1495 let len = read_u32(input)?;
1496 if input.len() < len {
1497 return None;
1498 }
1499 let value = String::from_utf8(input[..len].to_vec()).ok()?;
1500 *input = &input[len..];
1501 Some(value)
1502}
1503
1504fn read_bytes<'a>(input: &mut &'a [u8]) -> Option<&'a [u8]> {
1505 let len = read_u32(input)?;
1506 if input.len() < len {
1507 return None;
1508 }
1509 let value = &input[..len];
1510 *input = &input[len..];
1511 Some(value)
1512}
1513
1514fn encode_result_cache_payload(entry: &RuntimeResultCacheEntry) -> Option<Vec<u8>> {
1515 let result = &entry.result;
1516 if result.result.pre_serialized_json.is_some()
1517 || result_cache_static_str(result.statement).is_none()
1518 || result_cache_static_str(result.engine).is_none()
1519 || result_cache_static_str(result.statement_type).is_none()
1520 || result.result.records.iter().any(|record| {
1521 !record.nodes.is_empty()
1522 || !record.edges.is_empty()
1523 || !record.paths.is_empty()
1524 || !record.vector_results.is_empty()
1525 })
1526 {
1527 return None;
1528 }
1529
1530 let mut out = Vec::new();
1531 out.extend_from_slice(RESULT_CACHE_PAYLOAD_MAGIC);
1532 write_string(&mut out, &result.query)?;
1533 out.push(mode_to_byte(result.mode));
1534 write_string(&mut out, result.statement)?;
1535 write_string(&mut out, result.engine)?;
1536 out.extend_from_slice(&result.affected_rows.to_le_bytes());
1537 write_string(&mut out, result.statement_type)?;
1538
1539 write_u32(&mut out, result.result.columns.len())?;
1540 for column in &result.result.columns {
1541 write_string(&mut out, column)?;
1542 }
1543 out.extend_from_slice(&result.result.stats.nodes_scanned.to_le_bytes());
1544 out.extend_from_slice(&result.result.stats.edges_scanned.to_le_bytes());
1545 out.extend_from_slice(&result.result.stats.rows_scanned.to_le_bytes());
1546 out.extend_from_slice(&result.result.stats.exec_time_us.to_le_bytes());
1547
1548 write_u32(&mut out, result.result.records.len())?;
1549 for record in &result.result.records {
1550 let fields = record.iter_fields().collect::<Vec<_>>();
1551 write_u32(&mut out, fields.len())?;
1552 for (name, value) in fields {
1553 write_string(&mut out, name)?;
1554 let mut encoded = Vec::new();
1555 crate::storage::schema::value_codec::encode(value, &mut encoded);
1556 write_bytes(&mut out, &encoded)?;
1557 }
1558 }
1559
1560 write_u32(&mut out, entry.scopes.len())?;
1561 for scope in &entry.scopes {
1562 write_string(&mut out, scope)?;
1563 }
1564 Some(out)
1565}
1566
1567fn decode_result_cache_payload(mut input: &[u8]) -> Option<(RuntimeQueryResult, HashSet<String>)> {
1568 if input.len() < RESULT_CACHE_PAYLOAD_MAGIC.len()
1569 || &input[..RESULT_CACHE_PAYLOAD_MAGIC.len()] != RESULT_CACHE_PAYLOAD_MAGIC
1570 {
1571 return None;
1572 }
1573 input = &input[RESULT_CACHE_PAYLOAD_MAGIC.len()..];
1574
1575 let query = read_string(&mut input)?;
1576 let mode = mode_from_byte(read_u8(&mut input)?)?;
1577 let statement = result_cache_static_str(&read_string(&mut input)?)?;
1578 let engine = result_cache_static_str(&read_string(&mut input)?)?;
1579 let affected_rows = read_u64(&mut input)?;
1580 let statement_type = result_cache_static_str(&read_string(&mut input)?)?;
1581
1582 let mut columns = Vec::new();
1583 for _ in 0..read_u32(&mut input)? {
1584 columns.push(read_string(&mut input)?);
1585 }
1586 let stats = crate::storage::query::unified::QueryStats {
1587 nodes_scanned: read_u64(&mut input)?,
1588 edges_scanned: read_u64(&mut input)?,
1589 rows_scanned: read_u64(&mut input)?,
1590 exec_time_us: read_u64(&mut input)?,
1591 };
1592
1593 let mut records = Vec::new();
1594 for _ in 0..read_u32(&mut input)? {
1595 let mut record = crate::storage::query::unified::UnifiedRecord::new();
1596 for _ in 0..read_u32(&mut input)? {
1597 let name = read_string(&mut input)?;
1598 let bytes = read_bytes(&mut input)?;
1599 let (value, used) = crate::storage::schema::value_codec::decode(bytes).ok()?;
1600 if used != bytes.len() {
1601 return None;
1602 }
1603 record.set_owned(name, value);
1604 }
1605 records.push(record);
1606 }
1607
1608 let mut scopes = HashSet::new();
1609 for _ in 0..read_u32(&mut input)? {
1610 scopes.insert(read_string(&mut input)?);
1611 }
1612 if !input.is_empty() {
1613 return None;
1614 }
1615
1616 Some((
1617 RuntimeQueryResult {
1618 query,
1619 mode,
1620 statement,
1621 engine,
1622 result: crate::storage::query::unified::UnifiedResult {
1623 columns,
1624 records,
1625 stats,
1626 pre_serialized_json: None,
1627 },
1628 affected_rows,
1629 statement_type,
1630 },
1631 scopes,
1632 ))
1633}
1634
1635fn strip_explain_prefix(sql: &str) -> Option<&str> {
1649 let trimmed = sql.trim_start();
1650 let (head, rest) = trimmed.split_at(
1651 trimmed
1652 .find(|c: char| c.is_whitespace())
1653 .unwrap_or(trimmed.len()),
1654 );
1655 if !head.eq_ignore_ascii_case("EXPLAIN") {
1656 return None;
1657 }
1658 let rest = rest.trim_start();
1659 if rest.is_empty() {
1660 return None;
1661 }
1662 let next_head_end = rest.find(|c: char| c.is_whitespace()).unwrap_or(rest.len());
1665 if rest[..next_head_end].eq_ignore_ascii_case("ALTER") {
1666 return None;
1667 }
1668 Some(rest)
1669}
1670
1671pub(super) fn has_with_prefix(sql: &str) -> bool {
1676 let trimmed = sql.trim_start();
1677 let head_end = trimmed
1678 .find(|c: char| c.is_whitespace() || c == '(')
1679 .unwrap_or(trimmed.len());
1680 trimmed[..head_end].eq_ignore_ascii_case("WITH")
1681}
1682
1683fn peek_top_level_as_of(sql: &str) -> Option<crate::application::vcs::AsOfSpec> {
1691 peek_top_level_as_of_with_table(sql).map(|(spec, _)| spec)
1692}
1693
1694pub(super) fn peek_top_level_as_of_with_table(
1699 sql: &str,
1700) -> Option<(crate::application::vcs::AsOfSpec, Option<String>)> {
1701 if !sql
1702 .as_bytes()
1703 .windows(5)
1704 .any(|w| w.eq_ignore_ascii_case(b"as of"))
1705 {
1706 return None;
1707 }
1708 let parsed = crate::storage::query::parser::parse(sql).ok()?;
1709 let crate::storage::query::ast::QueryExpr::Table(table) = parsed.query else {
1710 return None;
1711 };
1712 let clause = table.as_of?;
1713 let table_name = if table.table.is_empty() || table.table == "any" {
1714 None
1715 } else {
1716 Some(table.table.clone())
1717 };
1718 let spec = match clause {
1719 crate::storage::query::ast::AsOfClause::Commit(h) => {
1720 crate::application::vcs::AsOfSpec::Commit(h)
1721 }
1722 crate::storage::query::ast::AsOfClause::Branch(b) => {
1723 crate::application::vcs::AsOfSpec::Branch(b)
1724 }
1725 crate::storage::query::ast::AsOfClause::Tag(t) => crate::application::vcs::AsOfSpec::Tag(t),
1726 crate::storage::query::ast::AsOfClause::TimestampMs(ts) => {
1727 crate::application::vcs::AsOfSpec::TimestampMs(ts)
1728 }
1729 crate::storage::query::ast::AsOfClause::Snapshot(x) => {
1730 crate::application::vcs::AsOfSpec::Snapshot(x)
1731 }
1732 };
1733 Some((spec, table_name))
1734}
1735
1736pub(super) fn query_has_volatile_builtin(sql: &str) -> bool {
1737 const VOLATILE_TOKENS: &[&str] = &[
1741 "pg_advisory_lock",
1742 "pg_try_advisory_lock",
1743 "pg_advisory_unlock",
1744 "random()",
1745 ];
1750 let lowered = sql.to_ascii_lowercase();
1751 VOLATILE_TOKENS.iter().any(|t| lowered.contains(t))
1752}
1753
1754pub(super) fn intent_lock_modes_for(
1764 expr: &QueryExpr,
1765) -> Option<(
1766 crate::storage::transaction::lock::LockMode,
1767 crate::storage::transaction::lock::LockMode,
1768)> {
1769 use crate::storage::transaction::lock::LockMode::{Exclusive, IntentExclusive, IntentShared};
1770
1771 match expr {
1772 QueryExpr::Table(_)
1774 | QueryExpr::Join(_)
1775 | QueryExpr::Vector(_)
1776 | QueryExpr::Hybrid(_)
1777 | QueryExpr::Graph(_)
1778 | QueryExpr::Path(_)
1779 | QueryExpr::Ask(_)
1780 | QueryExpr::SearchCommand(_)
1781 | QueryExpr::GraphCommand(_)
1782 | QueryExpr::QueueSelect(_) => Some((IntentShared, IntentShared)),
1783
1784 QueryExpr::Insert(_)
1792 | QueryExpr::Update(_)
1793 | QueryExpr::Delete(_)
1794 | QueryExpr::QueueCommand(QueueCommand::Move { .. }) => {
1795 Some((IntentExclusive, IntentExclusive))
1796 }
1797 QueryExpr::QueueCommand(_) => Some((IntentShared, IntentShared)),
1798
1799 QueryExpr::CreateTable(_)
1803 | QueryExpr::DropTable(_)
1804 | QueryExpr::DropGraph(_)
1805 | QueryExpr::DropVector(_)
1806 | QueryExpr::DropDocument(_)
1807 | QueryExpr::DropKv(_)
1808 | QueryExpr::DropCollection(_)
1809 | QueryExpr::Truncate(_)
1810 | QueryExpr::AlterTable(_)
1811 | QueryExpr::CreateIndex(_)
1812 | QueryExpr::DropIndex(_)
1813 | QueryExpr::CreateTimeSeries(_)
1814 | QueryExpr::DropTimeSeries(_)
1815 | QueryExpr::CreateQueue(_)
1816 | QueryExpr::AlterQueue(_)
1817 | QueryExpr::DropQueue(_)
1818 | QueryExpr::CreateTree(_)
1819 | QueryExpr::DropTree(_)
1820 | QueryExpr::CreatePolicy(_)
1821 | QueryExpr::DropPolicy(_)
1822 | QueryExpr::CreateView(_)
1823 | QueryExpr::DropView(_)
1824 | QueryExpr::RefreshMaterializedView(_)
1825 | QueryExpr::CreateSchema(_)
1826 | QueryExpr::DropSchema(_)
1827 | QueryExpr::CreateSequence(_)
1828 | QueryExpr::DropSequence(_)
1829 | QueryExpr::CreateServer(_)
1830 | QueryExpr::DropServer(_)
1831 | QueryExpr::CreateForeignTable(_)
1832 | QueryExpr::DropForeignTable(_) => Some((IntentExclusive, Exclusive)),
1833
1834 _ => None,
1840 }
1841}
1842
1843pub(super) fn collections_referenced(expr: &QueryExpr) -> Vec<String> {
1848 let mut out = Vec::new();
1849 walk_collections(expr, &mut out);
1850 out.sort();
1851 out.dedup();
1852 out
1853}
1854
1855fn walk_collections(expr: &QueryExpr, out: &mut Vec<String>) {
1856 match expr {
1857 QueryExpr::Table(t) => out.push(t.table.clone()),
1858 QueryExpr::Join(j) => {
1859 walk_collections(&j.left, out);
1860 walk_collections(&j.right, out);
1861 }
1862 QueryExpr::Insert(i) => out.push(i.table.clone()),
1863 QueryExpr::Update(u) => out.push(u.table.clone()),
1864 QueryExpr::Delete(d) => out.push(d.table.clone()),
1865 QueryExpr::QueueSelect(q) => out.push(q.queue.clone()),
1866
1867 QueryExpr::CreateTable(q) => out.push(q.name.clone()),
1872 QueryExpr::DropTable(q) => out.push(q.name.clone()),
1873 QueryExpr::DropGraph(q) => out.push(q.name.clone()),
1874 QueryExpr::DropVector(q) => out.push(q.name.clone()),
1875 QueryExpr::DropDocument(q) => out.push(q.name.clone()),
1876 QueryExpr::DropKv(q) => out.push(q.name.clone()),
1877 QueryExpr::DropCollection(q) => out.push(q.name.clone()),
1878 QueryExpr::Truncate(q) => out.push(q.name.clone()),
1879 QueryExpr::AlterTable(q) => out.push(q.name.clone()),
1880 QueryExpr::CreateIndex(q) => out.push(q.table.clone()),
1881 QueryExpr::DropIndex(q) => out.push(q.table.clone()),
1882 QueryExpr::CreateTimeSeries(q) => out.push(q.name.clone()),
1883 QueryExpr::DropTimeSeries(q) => out.push(q.name.clone()),
1884 QueryExpr::CreateQueue(q) => out.push(q.name.clone()),
1885 QueryExpr::AlterQueue(q) => out.push(q.name.clone()),
1886 QueryExpr::DropQueue(q) => out.push(q.name.clone()),
1887 QueryExpr::QueueCommand(QueueCommand::Move {
1888 source,
1889 destination,
1890 ..
1891 }) => {
1892 out.push(source.clone());
1893 out.push(destination.clone());
1894 }
1895 QueryExpr::CreatePolicy(q) => out.push(q.table.clone()),
1896 QueryExpr::CreateView(q) => out.push(q.name.clone()),
1897 QueryExpr::DropView(q) => out.push(q.name.clone()),
1898 QueryExpr::RefreshMaterializedView(q) => out.push(q.name.clone()),
1899
1900 _ => {}
1906 }
1907}
1908
1909impl RedDBRuntime {
1910 pub fn in_memory() -> RedDBResult<Self> {
1911 Self::with_options(RedDBOptions::in_memory())
1912 }
1913
1914 pub fn lock_manager(&self) -> std::sync::Arc<crate::storage::transaction::lock::LockManager> {
1918 self.inner.lock_manager.clone()
1919 }
1920
1921 #[inline(never)]
1922 pub fn with_options(options: RedDBOptions) -> RedDBResult<Self> {
1923 Self::with_pool(options, ConnectionPoolConfig::default())
1924 }
1925
1926 pub fn with_pool(
1927 options: RedDBOptions,
1928 pool_config: ConnectionPoolConfig,
1929 ) -> RedDBResult<Self> {
1930 let boot_open_start_ms = std::time::SystemTime::now()
1938 .duration_since(std::time::UNIX_EPOCH)
1939 .map(|d| d.as_millis() as u64)
1940 .unwrap_or(0);
1941 let db = Arc::new(
1942 RedDB::open_with_options(&options)
1943 .map_err(|err| RedDBError::Internal(err.to_string()))?,
1944 );
1945 let result_blob_cache = crate::storage::cache::BlobCache::open_with_l2(
1946 crate::storage::cache::BlobCacheConfig::default().with_l2_path(
1947 options
1948 .resolved_path("data.rdb")
1949 .with_extension("result-cache.l2"),
1950 ),
1951 )
1952 .map_err(|err| {
1953 RedDBError::Internal(format!("open result Blob Cache L2 failed: {err:?}"))
1954 })?;
1955 let storage_ready_ms = std::time::SystemTime::now()
1956 .duration_since(std::time::UNIX_EPOCH)
1957 .map(|d| d.as_millis() as u64)
1958 .unwrap_or(0);
1959
1960 let runtime = Self {
1961 inner: Arc::new(RuntimeInner {
1962 db,
1963 layout: PhysicalLayout::from_options(&options),
1964 indices: IndexCatalog::register_default_vector_graph(
1965 options.has_capability(crate::api::Capability::Table),
1966 options.has_capability(crate::api::Capability::Graph),
1967 ),
1968 pool_config,
1969 pool: Mutex::new(PoolState::default()),
1970 started_at_unix_ms: SystemTime::now()
1971 .duration_since(UNIX_EPOCH)
1972 .unwrap_or_default()
1973 .as_millis(),
1974 probabilistic: super::probabilistic_store::ProbabilisticStore::new(),
1975 index_store: super::index_store::IndexStore::new(),
1976 cdc: crate::replication::cdc::CdcBuffer::new(100_000),
1977 backup_scheduler: crate::replication::scheduler::BackupScheduler::new(3600),
1978 query_cache: parking_lot::RwLock::new(
1979 crate::storage::query::planner::cache::PlanCache::new(1000),
1980 ),
1981 result_cache: parking_lot::RwLock::new((
1982 HashMap::new(),
1983 std::collections::VecDeque::new(),
1984 )),
1985 result_blob_cache,
1986 result_blob_entries: parking_lot::RwLock::new((
1987 HashMap::new(),
1988 std::collections::VecDeque::new(),
1989 )),
1990 result_cache_shadow_divergences: std::sync::atomic::AtomicU64::new(0),
1991 queue_message_locks: parking_lot::RwLock::new(HashMap::new()),
1992 planner_dirty_tables: parking_lot::RwLock::new(HashSet::new()),
1993 ec_registry: Arc::new(crate::ec::config::EcRegistry::new()),
1994 ec_worker: crate::ec::worker::EcWorker::new(),
1995 auth_store: parking_lot::RwLock::new(None),
1996 oauth_validator: parking_lot::RwLock::new(None),
1997 views: parking_lot::RwLock::new(HashMap::new()),
1998 materialized_views: parking_lot::RwLock::new(
1999 crate::storage::cache::result::MaterializedViewCache::new(),
2000 ),
2001 snapshot_manager: Arc::new(
2002 crate::storage::transaction::snapshot::SnapshotManager::new(),
2003 ),
2004 tx_contexts: parking_lot::RwLock::new(HashMap::new()),
2005 tx_local_tenants: parking_lot::RwLock::new(HashMap::new()),
2006 env_config_overrides: crate::runtime::config_overlay::collect_env_overrides(),
2007 lock_manager: Arc::new({
2008 let env = crate::runtime::config_overlay::collect_env_overrides();
2013 let timeout_ms = env
2014 .get("concurrency.locking.deadlock_timeout_ms")
2015 .and_then(|raw| raw.parse::<u64>().ok())
2016 .unwrap_or_else(|| {
2017 match crate::runtime::config_matrix::default_for(
2018 "concurrency.locking.deadlock_timeout_ms",
2019 ) {
2020 Some(crate::serde_json::Value::Number(n)) => n as u64,
2021 _ => 5000,
2022 }
2023 });
2024 let cfg = crate::storage::transaction::lock::LockConfig {
2025 default_timeout: std::time::Duration::from_millis(timeout_ms),
2026 ..Default::default()
2027 };
2028 crate::storage::transaction::lock::LockManager::new(cfg)
2029 }),
2030 rls_policies: parking_lot::RwLock::new(HashMap::new()),
2031 rls_enabled_tables: parking_lot::RwLock::new(HashSet::new()),
2032 foreign_tables: Arc::new(crate::storage::fdw::ForeignTableRegistry::with_builtins()),
2033 pending_tombstones: parking_lot::RwLock::new(HashMap::new()),
2034 pending_kv_watch_events: parking_lot::RwLock::new(HashMap::new()),
2035 tenant_tables: parking_lot::RwLock::new(HashMap::new()),
2036 ddl_epoch: std::sync::atomic::AtomicU64::new(0),
2037 write_gate: Arc::new(crate::runtime::write_gate::WriteGate::from_options(
2038 &options,
2039 )),
2040 lifecycle: crate::runtime::lifecycle::Lifecycle::new(),
2041 resource_limits: crate::runtime::resource_limits::ResourceLimits::from_env(),
2042 audit_log: {
2043 let data_path = options
2047 .data_path
2048 .clone()
2049 .unwrap_or_else(|| std::env::temp_dir().join("reddb"));
2050 Arc::new(crate::runtime::audit_log::AuditLogger::for_data_path(
2051 &data_path,
2052 ))
2053 },
2054 lease_lifecycle: std::sync::OnceLock::new(),
2055 replica_apply_metrics: crate::replication::logical::ReplicaApplyMetrics::default(),
2056 quota_bucket: crate::runtime::quota_bucket::QuotaBucket::from_env(),
2057 schema_vocabulary: parking_lot::RwLock::new(
2058 crate::runtime::schema_vocabulary::SchemaVocabulary::new(),
2059 ),
2060 slow_query_logger: {
2061 let log_dir = options
2072 .data_path
2073 .as_ref()
2074 .and_then(|p| p.parent().map(std::path::PathBuf::from))
2075 .unwrap_or_else(|| std::env::temp_dir().join("reddb"));
2076 let threshold_ms = std::env::var("RED_SLOW_QUERY_THRESHOLD_MS")
2077 .ok()
2078 .and_then(|s| s.parse::<u64>().ok())
2079 .unwrap_or(1000);
2080 let sample_pct = std::env::var("RED_SLOW_QUERY_SAMPLE_PCT")
2081 .ok()
2082 .and_then(|s| s.parse::<u8>().ok())
2083 .unwrap_or(100);
2084 crate::telemetry::slow_query_logger::SlowQueryLogger::new(
2085 crate::telemetry::slow_query_logger::SlowQueryOpts {
2086 log_dir,
2087 threshold_ms,
2088 sample_pct,
2089 },
2090 )
2091 },
2092 kv_stats: crate::runtime::KvStatsCounters::default(),
2093 kv_tag_index: crate::runtime::KvTagIndex::default(),
2094 }),
2095 };
2096
2097 crate::telemetry::operator_event::install_global_audit_sink(Arc::clone(
2103 &runtime.inner.audit_log,
2104 ));
2105
2106 runtime
2114 .inner
2115 .lifecycle
2116 .set_restore_started_at_ms(boot_open_start_ms);
2117 runtime
2118 .inner
2119 .lifecycle
2120 .set_restore_ready_at_ms(storage_ready_ms);
2121 runtime
2122 .inner
2123 .lifecycle
2124 .set_wal_replay_started_at_ms(boot_open_start_ms);
2125 runtime
2126 .inner
2127 .lifecycle
2128 .set_wal_replay_ready_at_ms(storage_ready_ms);
2129
2130 let restored_cdc_lsn = runtime
2131 .inner
2132 .db
2133 .replication
2134 .as_ref()
2135 .map(|repl| {
2136 repl.logical_wal_spool
2137 .as_ref()
2138 .map(|spool| spool.current_lsn())
2139 .unwrap_or(0)
2140 })
2141 .unwrap_or(0)
2142 .max(runtime.config_u64("red.config.timeline.last_archived_lsn", 0));
2143 runtime.inner.cdc.set_current_lsn(restored_cdc_lsn);
2144 runtime.bootstrap_system_keyed_collections()?;
2145
2146 runtime.rehydrate_tenant_tables();
2150 if let Some(repl) = &runtime.inner.db.replication {
2151 repl.wal_buffer.set_current_lsn(restored_cdc_lsn);
2152 }
2153
2154 {
2156 let sys = SystemInfo::collect();
2157 runtime.inner.db.store().set_config_tree(
2158 "red.system",
2159 &crate::serde_json::json!({
2160 "pid": sys.pid,
2161 "cpu_cores": sys.cpu_cores,
2162 "total_memory_bytes": sys.total_memory_bytes,
2163 "available_memory_bytes": sys.available_memory_bytes,
2164 "os": sys.os,
2165 "arch": sys.arch,
2166 "hostname": sys.hostname,
2167 "started_at": SystemTime::now()
2168 .duration_since(UNIX_EPOCH)
2169 .unwrap_or_default()
2170 .as_millis() as u64
2171 }),
2172 );
2173
2174 let store = runtime.inner.db.store();
2176 if store
2177 .get_collection("red_config")
2178 .map(|m| m.query_all(|_| true).len())
2179 .unwrap_or(0)
2180 <= 10
2181 {
2182 store.set_config_tree("red.ai", &crate::json!({
2183 "default": crate::json!({
2184 "provider": "openai",
2185 "model": crate::ai::DEFAULT_OPENAI_PROMPT_MODEL
2186 }),
2187 "max_embedding_inputs": 256,
2188 "max_prompt_batch": 256,
2189 "timeout": crate::json!({ "connect_secs": 10, "read_secs": 90, "write_secs": 30 })
2190 }));
2191 store.set_config_tree(
2192 "red.server",
2193 &crate::json!({
2194 "max_scan_limit": 1000,
2195 "max_body_size": 1048576,
2196 "read_timeout_ms": 5000,
2197 "write_timeout_ms": 5000
2198 }),
2199 );
2200 store.set_config_tree(
2201 "red.storage",
2202 &crate::json!({
2203 "page_size": 4096,
2204 "page_cache_capacity": 100000,
2205 "auto_checkpoint_pages": 1000,
2206 "snapshot_retention": 16,
2207 "verify_checksums": true,
2208 "segment": crate::json!({
2209 "max_entities": 100000,
2210 "max_bytes": 268435456_u64,
2211 "compression_level": 6
2212 }),
2213 "hnsw": crate::json!({ "m": 16, "ef_construction": 100, "ef_search": 50 }),
2214 "ivf": crate::json!({ "n_lists": 100, "n_probes": 10 }),
2215 "bm25": crate::json!({ "k1": 1.2, "b": 0.75 })
2216 }),
2217 );
2218 store.set_config_tree(
2219 "red.search",
2220 &crate::json!({
2221 "rag": crate::json!({
2222 "max_chunks_per_source": 10,
2223 "max_total_chunks": 25,
2224 "similarity_threshold": 0.8,
2225 "graph_depth": 2,
2226 "min_relevance": 0.3
2227 }),
2228 "fusion": crate::json!({
2229 "vector_weight": 0.5,
2230 "graph_weight": 0.3,
2231 "table_weight": 0.2,
2232 "dedup_threshold": 0.85
2233 })
2234 }),
2235 );
2236 store.set_config_tree(
2237 "red.auth",
2238 &crate::json!({
2239 "enabled": false,
2240 "session_ttl_secs": 3600,
2241 "require_auth": false
2242 }),
2243 );
2244 store.set_config_tree(
2245 "red.query",
2246 &crate::json!({
2247 "connection_pool": crate::json!({ "max_connections": 64, "max_idle": 16 }),
2248 "max_recursion_depth": 1000
2249 }),
2250 );
2251 store.set_config_tree(
2252 "red.indexes",
2253 &crate::json!({
2254 "auto_select": true,
2255 "bloom_filter": crate::json!({
2256 "enabled": true,
2257 "false_positive_rate": 0.01,
2258 "prune_on_scan": true
2259 }),
2260 "hash": crate::json!({ "enabled": true }),
2261 "bitmap": crate::json!({ "enabled": true, "max_cardinality": 1000 }),
2262 "spatial": crate::json!({ "enabled": true })
2263 }),
2264 );
2265 store.set_config_tree(
2266 "red.memtable",
2267 &crate::json!({
2268 "enabled": true,
2269 "max_bytes": 67108864_u64,
2270 "flush_threshold": 0.75
2271 }),
2272 );
2273 store.set_config_tree(
2274 "red.probabilistic",
2275 &crate::json!({
2276 "hll_registers": 16384,
2277 "sketch_default_width": 1000,
2278 "sketch_default_depth": 5,
2279 "filter_default_capacity": 100000
2280 }),
2281 );
2282 store.set_config_tree(
2283 "red.timeseries",
2284 &crate::json!({
2285 "default_chunk_size": 1024,
2286 "compression": crate::json!({
2287 "timestamps": "delta_of_delta",
2288 "values": "gorilla_xor"
2289 }),
2290 "default_retention_days": 0
2291 }),
2292 );
2293 store.set_config_tree(
2294 "red.queue",
2295 &crate::json!({
2296 "default_max_size": 0,
2297 "default_max_attempts": 3,
2298 "visibility_timeout_ms": 30000,
2299 "consumer_idle_timeout_ms": 60000
2300 }),
2301 );
2302 store.set_config_tree(
2303 "red.backup",
2304 &crate::json!({
2305 "enabled": false,
2306 "interval_secs": 3600,
2307 "retention_count": 24,
2308 "upload": false,
2309 "backend": "local"
2310 }),
2311 );
2312 store.set_config_tree(
2313 "red.wal",
2314 &crate::json!({
2315 "archive": crate::json!({
2316 "enabled": false,
2317 "retention_hours": 168,
2318 "prefix": "wal/"
2319 })
2320 }),
2321 );
2322 store.set_config_tree(
2323 "red.cdc",
2324 &crate::json!({
2325 "enabled": true,
2326 "buffer_size": 100000
2327 }),
2328 );
2329 store.set_config_tree(
2330 "red.config.secret",
2331 &crate::json!({
2332 "auto_encrypt": true,
2333 "auto_decrypt": true
2334 }),
2335 );
2336 }
2337
2338 crate::runtime::config_matrix::heal_critical_keys(store.as_ref());
2345
2346 let lehman_yao = runtime.config_bool("storage.btree.lehman_yao", true);
2353 crate::storage::engine::btree::lehman_yao::set_enabled(lehman_yao);
2354 if lehman_yao {
2355 tracing::info!(
2356 "storage.btree.lehman_yao=true — lock-free concurrent descent enabled"
2357 );
2358 }
2359
2360 let overlay_path = crate::runtime::config_overlay::config_file_path();
2365 let _ =
2366 crate::runtime::config_overlay::apply_config_file(store.as_ref(), &overlay_path);
2367 }
2368
2369 {
2373 let store = runtime.inner.db.store();
2374 for name in crate::application::vcs_collections::ALL {
2375 let _ = store.get_or_create_collection(*name);
2376 }
2377 store.set_config_tree(
2380 crate::application::vcs_collections::CONFIG_NAMESPACE,
2381 &crate::json!({
2382 "default_branch": "main",
2383 "author": crate::json!({
2384 "name": "reddb",
2385 "email": "reddb@localhost"
2386 }),
2387 "protected_branches": crate::json!(["main"]),
2388 "closure": crate::json!({
2389 "enabled": true,
2390 "lazy": true
2391 }),
2392 "merge": crate::json!({
2393 "default_strategy": "auto",
2394 "fast_forward": true
2395 })
2396 }),
2397 );
2398 }
2399
2400 {
2403 let store = runtime.inner.db.store();
2404 for name in crate::application::migration_collections::ALL {
2405 let _ = store.get_or_create_collection(*name);
2406 }
2407 }
2408
2409 {
2424 let weak = Arc::downgrade(&runtime.inner);
2425 std::thread::Builder::new()
2426 .name("reddb-maintenance".into())
2427 .spawn(move || {
2428 let tick = std::time::Duration::from_millis(200);
2429 let work_interval = std::time::Duration::from_secs(60);
2430 let mut last_work = std::time::Instant::now();
2431 loop {
2432 std::thread::sleep(tick);
2433 let Some(inner) = weak.upgrade() else {
2434 break;
2437 };
2438 if last_work.elapsed() >= work_interval {
2439 let _stats = inner.db.store().context_index().stats();
2440 last_work = std::time::Instant::now();
2441 }
2442 }
2443 })
2444 .ok();
2445 }
2446
2447 {
2449 let store = runtime.inner.db.store();
2450 let mut backup_enabled = false;
2451 let mut backup_interval = 3600u64;
2452
2453 if let Some(manager) = store.get_collection("red_config") {
2454 manager.for_each_entity(|entity| {
2455 if let Some(row) = entity.data.as_row() {
2456 let key = row.get_field("key").and_then(|v| match v {
2457 crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
2458 _ => None,
2459 });
2460 let val = row.get_field("value");
2461 if key == Some("red.config.backup.enabled") {
2462 backup_enabled = match val {
2463 Some(crate::storage::schema::Value::Boolean(true)) => true,
2464 Some(crate::storage::schema::Value::Text(s)) => &**s == "true",
2465 _ => false,
2466 };
2467 } else if key == Some("red.config.backup.interval_secs") {
2468 if let Some(crate::storage::schema::Value::Integer(n)) = val {
2469 backup_interval = *n as u64;
2470 }
2471 }
2472 }
2473 true
2474 });
2475 }
2476
2477 if backup_enabled {
2478 runtime.inner.backup_scheduler.set_interval(backup_interval);
2479 let rt = runtime.clone();
2480 runtime
2481 .inner
2482 .backup_scheduler
2483 .start(move || rt.trigger_backup().map_err(|e| format!("{}", e)));
2484 }
2485 }
2486
2487 {
2489 runtime
2490 .inner
2491 .ec_registry
2492 .load_from_config_store(runtime.inner.db.store().as_ref());
2493 if !runtime.inner.ec_registry.async_configs().is_empty() {
2494 runtime.inner.ec_worker.start(
2495 Arc::clone(&runtime.inner.ec_registry),
2496 Arc::clone(&runtime.inner.db.store()),
2497 );
2498 }
2499 }
2500
2501 if let crate::replication::ReplicationRole::Replica { primary_addr } =
2502 runtime.inner.db.options().replication.role.clone()
2503 {
2504 let rt = runtime.clone();
2505 std::thread::Builder::new()
2506 .name("reddb-replica".into())
2507 .spawn(move || rt.run_replica_loop(primary_addr))
2508 .ok();
2509 }
2510
2511 runtime.inner.lifecycle.mark_ready();
2516
2517 Ok(runtime)
2518 }
2519
2520 fn bootstrap_system_keyed_collections(&self) -> RedDBResult<()> {
2521 let mut changed = false;
2522 for (name, model) in [
2523 ("red.config", crate::catalog::CollectionModel::Config),
2524 ("red.vault", crate::catalog::CollectionModel::Vault),
2525 ] {
2526 if self.inner.db.store().get_collection(name).is_none() {
2527 self.inner.db.store().get_or_create_collection(name);
2528 changed = true;
2529 }
2530 if self.inner.db.collection_contract(name).is_none() {
2531 self.inner
2532 .db
2533 .save_collection_contract(system_keyed_collection_contract(name, model))
2534 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2535 changed = true;
2536 }
2537 }
2538 if changed {
2539 self.inner
2540 .db
2541 .persist_metadata()
2542 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2543 }
2544 Ok(())
2545 }
2546
2547 pub fn db(&self) -> Arc<RedDB> {
2548 Arc::clone(&self.inner.db)
2549 }
2550
2551 pub fn index_store_ref(&self) -> &super::index_store::IndexStore {
2556 &self.inner.index_store
2557 }
2558
2559 pub(crate) fn schema_vocabulary_apply(
2564 &self,
2565 event: crate::runtime::schema_vocabulary::DdlEvent,
2566 ) {
2567 self.inner.schema_vocabulary.write().on_ddl(event);
2568 }
2569
2570 pub fn schema_vocabulary_lookup(
2575 &self,
2576 token: &str,
2577 ) -> Vec<crate::runtime::schema_vocabulary::VocabHit> {
2578 self.inner.schema_vocabulary.read().lookup(token).to_vec()
2579 }
2580
2581 pub fn set_auth_store(&self, store: Arc<crate::auth::store::AuthStore>) {
2585 *self.inner.auth_store.write() = Some(store);
2586 }
2587
2588 pub fn vault_kv_get(&self, key: &str) -> Option<String> {
2590 self.inner
2591 .auth_store
2592 .read()
2593 .as_ref()
2594 .and_then(|store| store.vault_kv_get(key))
2595 }
2596
2597 pub fn vault_kv_try_set(&self, key: String, value: String) -> RedDBResult<()> {
2600 let store = self.inner.auth_store.read().clone().ok_or_else(|| {
2601 RedDBError::Query("secret storage requires an enabled, unsealed vault".to_string())
2602 })?;
2603 store
2604 .vault_kv_try_set(key, value)
2605 .map_err(|err| RedDBError::Query(err.to_string()))
2606 }
2607
2608 pub fn set_oauth_validator(&self, validator: Option<Arc<crate::auth::oauth::OAuthValidator>>) {
2612 *self.inner.oauth_validator.write() = validator;
2613 }
2614
2615 pub fn oauth_validator(&self) -> Option<Arc<crate::auth::oauth::OAuthValidator>> {
2619 self.inner.oauth_validator.read().clone()
2620 }
2621
2622 pub(crate) fn secret_aes_key(&self) -> Option<[u8; 32]> {
2626 let guard = self.inner.auth_store.read();
2627 guard.as_ref().and_then(|s| s.vault_secret_key())
2628 }
2629
2630 pub(crate) fn config_bool(&self, key: &str, default: bool) -> bool {
2636 if let Some(raw) = self.inner.env_config_overrides.get(key) {
2637 if let Some(crate::storage::schema::Value::Boolean(b)) =
2638 crate::runtime::config_overlay::coerce_env_value(key, raw)
2639 {
2640 return b;
2641 }
2642 }
2643 let store = self.inner.db.store();
2644 let Some(manager) = store.get_collection("red_config") else {
2645 return default;
2646 };
2647 let mut result = default;
2648 let mut latest_id: u64 = 0;
2649 manager.for_each_entity(|entity| {
2650 if let Some(row) = entity.data.as_row() {
2651 let entry_key = row.get_field("key").and_then(|v| match v {
2652 crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
2653 _ => None,
2654 });
2655 if entry_key == Some(key) {
2656 let id = entity.id.raw();
2657 if id >= latest_id {
2658 latest_id = id;
2659 result = match row.get_field("value") {
2660 Some(crate::storage::schema::Value::Boolean(b)) => *b,
2661 Some(crate::storage::schema::Value::Text(s)) => {
2662 matches!(s.as_ref(), "true" | "TRUE" | "True" | "1")
2663 }
2664 Some(crate::storage::schema::Value::Integer(n)) => *n != 0,
2665 _ => default,
2666 };
2667 }
2668 }
2669 }
2670 true
2671 });
2672 result
2673 }
2674
2675 pub(crate) fn config_u64(&self, key: &str, default: u64) -> u64 {
2676 if let Some(raw) = self.inner.env_config_overrides.get(key) {
2677 if let Some(crate::storage::schema::Value::UnsignedInteger(n)) =
2678 crate::runtime::config_overlay::coerce_env_value(key, raw)
2679 {
2680 return n;
2681 }
2682 }
2683 let store = self.inner.db.store();
2684 let Some(manager) = store.get_collection("red_config") else {
2685 return default;
2686 };
2687 let mut result = default;
2688 let mut latest_id: u64 = 0;
2689 manager.for_each_entity(|entity| {
2690 if let Some(row) = entity.data.as_row() {
2691 let entry_key = row.get_field("key").and_then(|v| match v {
2692 crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
2693 _ => None,
2694 });
2695 if entry_key == Some(key) {
2696 let id = entity.id.raw();
2697 if id >= latest_id {
2698 latest_id = id;
2699 result = match row.get_field("value") {
2700 Some(crate::storage::schema::Value::Integer(n)) => *n as u64,
2701 Some(crate::storage::schema::Value::UnsignedInteger(n)) => *n,
2702 Some(crate::storage::schema::Value::Text(s)) => {
2703 s.parse::<u64>().unwrap_or(default)
2704 }
2705 _ => default,
2706 };
2707 }
2708 }
2709 }
2710 true
2711 });
2712 result
2713 }
2714
2715 pub(crate) fn config_string(&self, key: &str, default: &str) -> String {
2716 if let Some(raw) = self.inner.env_config_overrides.get(key) {
2717 return raw.clone();
2718 }
2719 let store = self.inner.db.store();
2720 let Some(manager) = store.get_collection("red_config") else {
2721 return default.to_string();
2722 };
2723 let mut result = default.to_string();
2724 let mut latest_id: u64 = 0;
2725 manager.for_each_entity(|entity| {
2726 if let Some(row) = entity.data.as_row() {
2727 let entry_key = row.get_field("key").and_then(|v| match v {
2728 crate::storage::schema::Value::Text(s) => Some(s.as_ref()),
2729 _ => None,
2730 });
2731 if entry_key == Some(key) {
2732 let id = entity.id.raw();
2733 if id >= latest_id {
2734 latest_id = id;
2735 if let Some(crate::storage::schema::Value::Text(value)) =
2736 row.get_field("value")
2737 {
2738 result = value.to_string();
2739 }
2740 }
2741 }
2742 }
2743 true
2744 });
2745 result
2746 }
2747
2748 fn latest_metadata_for(
2749 &self,
2750 collection: &str,
2751 entity_id: u64,
2752 ) -> Option<crate::serde_json::Value> {
2753 self.inner
2754 .db
2755 .store()
2756 .get_metadata(collection, EntityId::new(entity_id))
2757 .map(|metadata| metadata_to_json(&metadata))
2758 }
2759
2760 fn persist_replica_lsn(&self, lsn: u64) {
2761 self.inner.db.store().set_config_tree(
2762 "red.replication",
2763 &crate::json!({
2764 "last_applied_lsn": lsn
2765 }),
2766 );
2767 }
2768
2769 fn persist_replication_health(
2770 &self,
2771 state: &str,
2772 last_error: &str,
2773 primary_lsn: Option<u64>,
2774 oldest_available_lsn: Option<u64>,
2775 ) {
2776 self.inner.db.store().set_config_tree(
2777 "red.replication",
2778 &crate::json!({
2779 "state": state,
2780 "last_error": last_error,
2781 "last_seen_primary_lsn": primary_lsn.unwrap_or(0),
2782 "last_seen_oldest_lsn": oldest_available_lsn.unwrap_or(0),
2783 "updated_at_unix_ms": SystemTime::now()
2784 .duration_since(UNIX_EPOCH)
2785 .unwrap_or_default()
2786 .as_millis() as u64
2787 }),
2788 );
2789 }
2790
2791 pub(crate) fn secret_auto_encrypt(&self) -> bool {
2794 self.config_bool("red.config.secret.auto_encrypt", true)
2795 }
2796
2797 pub(crate) fn secret_auto_decrypt(&self) -> bool {
2802 self.config_bool("red.config.secret.auto_decrypt", true)
2803 }
2804
2805 pub(crate) fn apply_secret_decryption(&self, result: &mut RuntimeQueryResult) {
2812 if !self.secret_auto_decrypt() {
2813 return;
2814 }
2815 let Some(key) = self.secret_aes_key() else {
2816 return;
2817 };
2818 for record in result.result.records.iter_mut() {
2819 for value in record.values_mut() {
2820 if let Value::Secret(ref bytes) = value {
2821 if let Some(plain) =
2822 super::impl_dml::decrypt_secret_payload(&key, bytes.as_slice())
2823 {
2824 if let Ok(text) = String::from_utf8(plain) {
2825 *value = Value::text(text);
2826 }
2827 }
2828 }
2829 }
2830 }
2831 }
2832
2833 pub(crate) fn mutation_engine(&self) -> crate::runtime::mutation::MutationEngine<'_> {
2841 crate::runtime::mutation::MutationEngine::new(self)
2842 }
2843
2844 pub fn check_write(&self, kind: crate::runtime::write_gate::WriteKind) -> RedDBResult<()> {
2855 self.inner.write_gate.check(kind)
2856 }
2857
2858 pub fn write_gate(&self) -> &crate::runtime::write_gate::WriteGate {
2862 &self.inner.write_gate
2863 }
2864
2865 pub fn lifecycle(&self) -> &crate::runtime::lifecycle::Lifecycle {
2869 &self.inner.lifecycle
2870 }
2871
2872 pub fn resource_limits(&self) -> &crate::runtime::resource_limits::ResourceLimits {
2874 &self.inner.resource_limits
2875 }
2876
2877 pub fn audit_log(&self) -> &crate::runtime::audit_log::AuditLogger {
2879 &self.inner.audit_log
2880 }
2881
2882 pub fn audit_log_arc(&self) -> Arc<crate::runtime::audit_log::AuditLogger> {
2886 Arc::clone(&self.inner.audit_log)
2887 }
2888
2889 pub fn write_gate_arc(&self) -> Arc<crate::runtime::write_gate::WriteGate> {
2894 Arc::clone(&self.inner.write_gate)
2895 }
2896
2897 pub fn lease_lifecycle(&self) -> Option<&Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>> {
2900 self.inner.lease_lifecycle.get()
2901 }
2902
2903 pub fn set_lease_lifecycle(
2906 &self,
2907 lifecycle: Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>,
2908 ) -> Result<(), Arc<crate::runtime::lease_lifecycle::LeaseLifecycle>> {
2909 self.inner.lease_lifecycle.set(lifecycle)
2910 }
2911
2912 pub fn check_batch_size(&self, requested: usize) -> RedDBResult<()> {
2917 if self.inner.resource_limits.batch_size_exceeded(requested) {
2918 let max = self.inner.resource_limits.max_batch_size.unwrap_or(0);
2919 return Err(RedDBError::QuotaExceeded(format!(
2920 "max_batch_size:{requested}:{max}"
2921 )));
2922 }
2923 Ok(())
2924 }
2925
2926 pub fn check_db_size(&self) -> RedDBResult<()> {
2932 let Some(limit) = self.inner.resource_limits.max_db_size_bytes else {
2933 return Ok(());
2934 };
2935 if limit == 0 {
2936 return Ok(());
2937 }
2938 let Some(path) = self.inner.db.path() else {
2939 return Ok(());
2940 };
2941 let current = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
2942 if current > limit {
2943 return Err(RedDBError::QuotaExceeded(format!(
2944 "max_db_size_bytes:{current}:{limit}"
2945 )));
2946 }
2947 Ok(())
2948 }
2949
2950 pub fn graceful_shutdown(
2968 &self,
2969 backup_on_shutdown: bool,
2970 ) -> RedDBResult<crate::runtime::lifecycle::ShutdownReport> {
2971 if !self.inner.lifecycle.begin_shutdown() {
2972 return Ok(self.inner.lifecycle.shutdown_report().unwrap_or_default());
2976 }
2977
2978 let started_ms = std::time::SystemTime::now()
2979 .duration_since(std::time::UNIX_EPOCH)
2980 .map(|d| d.as_millis() as u64)
2981 .unwrap_or(0);
2982 let mut report = crate::runtime::lifecycle::ShutdownReport {
2983 started_at_ms: started_ms,
2984 ..Default::default()
2985 };
2986
2987 let flush_res = self.inner.db.flush_local_only();
2993 report.flushed_wal = flush_res.is_ok();
2994 report.final_checkpoint = flush_res.is_ok();
2995 if let Err(err) = &flush_res {
2996 tracing::error!(
2997 target: "reddb::lifecycle",
2998 error = %err,
2999 "graceful_shutdown: local flush failed"
3000 );
3001 } else if let Err(lease_err) =
3002 self.assert_remote_write_allowed("shutdown/checkpoint_upload")
3003 {
3004 tracing::warn!(
3005 target: "reddb::serverless::lease",
3006 error = %lease_err,
3007 "graceful_shutdown: remote upload skipped — lease not held"
3008 );
3009 } else if let Err(err) = self.inner.db.upload_to_remote_backend() {
3010 tracing::error!(
3011 target: "reddb::lifecycle",
3012 error = %err,
3013 "graceful_shutdown: remote upload failed"
3014 );
3015 }
3016
3017 if backup_on_shutdown && self.inner.db.remote_backend.is_some() {
3022 match self.trigger_backup() {
3028 Ok(result) => {
3029 report.backup_uploaded = result.uploaded;
3030 }
3031 Err(err) => {
3032 tracing::warn!(
3033 target: "reddb::lifecycle",
3034 error = %err,
3035 "graceful_shutdown: final backup skipped"
3036 );
3037 }
3038 }
3039 }
3040
3041 let completed_ms = std::time::SystemTime::now()
3042 .duration_since(std::time::UNIX_EPOCH)
3043 .map(|d| d.as_millis() as u64)
3044 .unwrap_or(started_ms);
3045 report.completed_at_ms = completed_ms;
3046 report.duration_ms = completed_ms.saturating_sub(started_ms);
3047
3048 self.inner.lifecycle.finish_shutdown(report.clone());
3049 Ok(report)
3050 }
3051
3052 pub(crate) fn cdc_emit_no_cache_invalidate(
3058 &self,
3059 operation: crate::replication::cdc::ChangeOperation,
3060 collection: &str,
3061 entity_id: u64,
3062 entity_kind: &str,
3063 ) -> u64 {
3064 let lsn = self
3065 .inner
3066 .cdc
3067 .emit(operation, collection, entity_id, entity_kind);
3068
3069 if let Some(ref primary) = self.inner.db.replication {
3071 let store = self.inner.db.store();
3072 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
3073 None
3074 } else {
3075 store.get(collection, EntityId::new(entity_id))
3076 };
3077 let record = ChangeRecord {
3078 lsn,
3079 timestamp: SystemTime::now()
3080 .duration_since(UNIX_EPOCH)
3081 .unwrap_or_default()
3082 .as_millis() as u64,
3083 operation,
3084 collection: collection.to_string(),
3085 entity_id,
3086 entity_kind: entity_kind.to_string(),
3087 entity_bytes: entity
3088 .as_ref()
3089 .map(|e| UnifiedStore::serialize_entity(e, store.format_version())),
3090 metadata: self.latest_metadata_for(collection, entity_id),
3091 };
3092 let encoded = record.encode();
3093 primary.wal_buffer.append(record.lsn, encoded.clone());
3094 if let Some(spool) = &primary.logical_wal_spool {
3095 let _ = spool.append(record.lsn, &encoded);
3096 }
3097 }
3098 lsn
3099 }
3100
3101 pub(crate) fn cdc_emit_insert_batch_no_cache_invalidate(
3102 &self,
3103 collection: &str,
3104 ids: &[EntityId],
3105 entity_kind: &str,
3106 ) -> Vec<u64> {
3107 if ids.is_empty() {
3108 return Vec::new();
3109 }
3110
3111 if self.inner.db.replication.is_none() {
3115 return self.inner.cdc.emit_batch_same_collection(
3116 crate::replication::cdc::ChangeOperation::Insert,
3117 collection,
3118 entity_kind,
3119 ids.iter().map(|id| id.raw()),
3120 );
3121 }
3122
3123 ids.iter()
3126 .map(|id| {
3127 self.cdc_emit_no_cache_invalidate(
3128 crate::replication::cdc::ChangeOperation::Insert,
3129 collection,
3130 id.raw(),
3131 entity_kind,
3132 )
3133 })
3134 .collect()
3135 }
3136
3137 pub fn cdc_emit(
3138 &self,
3139 operation: crate::replication::cdc::ChangeOperation,
3140 collection: &str,
3141 entity_id: u64,
3142 entity_kind: &str,
3143 ) -> u64 {
3144 let lsn = self
3145 .inner
3146 .cdc
3147 .emit(operation, collection, entity_id, entity_kind);
3148 self.invalidate_result_cache_for_table(collection);
3154
3155 if let Some(ref primary) = self.inner.db.replication {
3157 let store = self.inner.db.store();
3158 let entity = if operation == crate::replication::cdc::ChangeOperation::Delete {
3159 None
3160 } else {
3161 store.get(collection, EntityId::new(entity_id))
3162 };
3163 let record = ChangeRecord {
3164 lsn,
3165 timestamp: SystemTime::now()
3166 .duration_since(UNIX_EPOCH)
3167 .unwrap_or_default()
3168 .as_millis() as u64,
3169 operation,
3170 collection: collection.to_string(),
3171 entity_id,
3172 entity_kind: entity_kind.to_string(),
3173 entity_bytes: entity
3174 .as_ref()
3175 .map(|entity| UnifiedStore::serialize_entity(entity, store.format_version())),
3176 metadata: self.latest_metadata_for(collection, entity_id),
3177 };
3178 let encoded = record.encode();
3179 primary.wal_buffer.append(record.lsn, encoded.clone());
3180 if let Some(spool) = &primary.logical_wal_spool {
3181 let _ = spool.append(record.lsn, &encoded);
3182 }
3183 }
3184 lsn
3185 }
3186
3187 pub(crate) fn cdc_emit_kv(
3188 &self,
3189 operation: crate::replication::cdc::ChangeOperation,
3190 collection: &str,
3191 key: &str,
3192 entity_id: u64,
3193 before: Option<crate::json::Value>,
3194 after: Option<crate::json::Value>,
3195 ) -> u64 {
3196 let lsn = self
3197 .inner
3198 .cdc
3199 .emit_kv(operation, collection, key, entity_id, before, after);
3200 self.inner.kv_stats.incr_watch_events_emitted();
3201 self.invalidate_result_cache_for_table(collection);
3202 lsn
3203 }
3204
3205 pub(crate) fn record_kv_watch_event(
3206 &self,
3207 operation: crate::replication::cdc::ChangeOperation,
3208 collection: &str,
3209 key: &str,
3210 entity_id: u64,
3211 before: Option<crate::json::Value>,
3212 after: Option<crate::json::Value>,
3213 ) {
3214 if self.current_xid().is_some() {
3215 let conn_id = current_connection_id();
3216 let event = crate::replication::cdc::KvWatchEvent {
3217 collection: collection.to_string(),
3218 key: key.to_string(),
3219 op: operation,
3220 before,
3221 after,
3222 lsn: 0,
3223 committed_at: 0,
3224 dropped_event_count: 0,
3225 };
3226 self.inner
3227 .pending_kv_watch_events
3228 .write()
3229 .entry(conn_id)
3230 .or_default()
3231 .push(event);
3232 return;
3233 }
3234
3235 self.cdc_emit_kv(operation, collection, key, entity_id, before, after);
3236 }
3237
3238 pub(crate) fn cdc_emit_prebuilt(
3239 &self,
3240 operation: crate::replication::cdc::ChangeOperation,
3241 collection: &str,
3242 entity: &UnifiedEntity,
3243 entity_kind: &str,
3244 metadata: Option<&crate::storage::Metadata>,
3245 invalidate_cache: bool,
3246 ) -> u64 {
3247 self.cdc_emit_prebuilt_with_columns(
3248 operation,
3249 collection,
3250 entity,
3251 entity_kind,
3252 metadata,
3253 invalidate_cache,
3254 None,
3255 )
3256 }
3257
3258 pub(crate) fn cdc_emit_prebuilt_with_columns(
3265 &self,
3266 operation: crate::replication::cdc::ChangeOperation,
3267 collection: &str,
3268 entity: &UnifiedEntity,
3269 entity_kind: &str,
3270 metadata: Option<&crate::storage::Metadata>,
3271 invalidate_cache: bool,
3272 changed_columns: Option<Vec<String>>,
3273 ) -> u64 {
3274 if invalidate_cache {
3275 self.invalidate_result_cache();
3276 }
3277
3278 let lsn = self.inner.cdc.emit_with_columns(
3279 operation,
3280 collection,
3281 entity.id.raw(),
3282 entity_kind,
3283 changed_columns,
3284 );
3285
3286 if let Some(ref primary) = self.inner.db.replication {
3287 let store = self.inner.db.store();
3288 let record = ChangeRecord {
3289 lsn,
3290 timestamp: SystemTime::now()
3291 .duration_since(UNIX_EPOCH)
3292 .unwrap_or_default()
3293 .as_millis() as u64,
3294 operation,
3295 collection: collection.to_string(),
3296 entity_id: entity.id.raw(),
3297 entity_kind: entity_kind.to_string(),
3298 entity_bytes: Some(UnifiedStore::serialize_entity(
3299 entity,
3300 store.format_version(),
3301 )),
3302 metadata: metadata
3303 .map(metadata_to_json)
3304 .or_else(|| self.latest_metadata_for(collection, entity.id.raw())),
3305 };
3306 let encoded = record.encode();
3307 primary.wal_buffer.append(record.lsn, encoded.clone());
3308 if let Some(spool) = &primary.logical_wal_spool {
3309 let _ = spool.append(record.lsn, &encoded);
3310 }
3311 }
3312
3313 lsn
3314 }
3315
3316 pub(crate) fn cdc_emit_prebuilt_batch<'a, I>(
3317 &self,
3318 operation: crate::replication::cdc::ChangeOperation,
3319 entity_kind: &str,
3320 items: I,
3321 invalidate_cache: bool,
3322 ) where
3323 I: IntoIterator<
3324 Item = (
3325 &'a str,
3326 &'a UnifiedEntity,
3327 Option<&'a crate::storage::Metadata>,
3328 ),
3329 >,
3330 {
3331 let items: Vec<(&str, &UnifiedEntity, Option<&crate::storage::Metadata>)> =
3332 items.into_iter().collect();
3333 if items.is_empty() {
3334 return;
3335 }
3336
3337 if invalidate_cache {
3338 self.invalidate_result_cache();
3339 }
3340
3341 for (collection, entity, metadata) in items {
3342 self.cdc_emit_prebuilt(operation, collection, entity, entity_kind, metadata, false);
3343 }
3344 }
3345
3346 fn run_replica_loop(&self, primary_addr: String) {
3347 let endpoint = if primary_addr.starts_with("http") {
3348 primary_addr
3349 } else {
3350 format!("http://{primary_addr}")
3351 };
3352 let poll_ms = self.inner.db.options().replication.poll_interval_ms;
3353 let max_count = self.inner.db.options().replication.max_batch_size;
3354 let mut since_lsn = self.config_u64("red.replication.last_applied_lsn", 0);
3355
3356 let runtime = match tokio::runtime::Builder::new_current_thread()
3357 .enable_all()
3358 .build()
3359 {
3360 Ok(runtime) => runtime,
3361 Err(_) => return,
3362 };
3363
3364 runtime.block_on(async move {
3365 use crate::grpc::proto::red_db_client::RedDbClient;
3366 use crate::grpc::proto::JsonPayloadRequest;
3367
3368 let mut client = loop {
3369 match RedDbClient::connect(endpoint.clone()).await {
3370 Ok(client) => {
3371 self.persist_replication_health("connecting", "", None, None);
3372 break client;
3373 }
3374 Err(_) => {
3375 self.persist_replication_health(
3376 "connecting",
3377 "waiting for primary connection",
3378 None,
3379 None,
3380 );
3381 std::thread::sleep(std::time::Duration::from_millis(poll_ms.max(250)))
3382 }
3383 }
3384 };
3385
3386 let applier = crate::replication::logical::LogicalChangeApplier::new(since_lsn);
3391
3392 loop {
3393 let payload = crate::json!({
3394 "since_lsn": since_lsn,
3395 "max_count": max_count
3396 });
3397 let request = tonic::Request::new(JsonPayloadRequest {
3398 payload_json: crate::json::to_string(&payload)
3399 .unwrap_or_else(|_| "{}".to_string()),
3400 });
3401
3402 if let Ok(response) = client.pull_wal_records(request).await {
3403 if let Ok(value) =
3404 crate::json::from_str::<crate::json::Value>(&response.into_inner().payload)
3405 {
3406 let current_lsn =
3407 value.get("current_lsn").and_then(crate::json::Value::as_u64);
3408 let oldest_available_lsn = value
3409 .get("oldest_available_lsn")
3410 .and_then(crate::json::Value::as_u64);
3411 if since_lsn > 0
3412 && oldest_available_lsn
3413 .map(|oldest| oldest > since_lsn.saturating_add(1))
3414 .unwrap_or(false)
3415 {
3416 self.persist_replication_health(
3417 "stalled_gap",
3418 "replica is behind the oldest logical WAL available on primary; re-bootstrap required",
3419 current_lsn,
3420 oldest_available_lsn,
3421 );
3422 std::thread::sleep(std::time::Duration::from_millis(poll_ms.max(250)));
3423 continue;
3424 }
3425 if let Some(records) =
3426 value.get("records").and_then(crate::json::Value::as_array)
3427 {
3428 for record in records {
3429 let Some(data_hex) =
3430 record.get("data").and_then(crate::json::Value::as_str)
3431 else {
3432 continue;
3433 };
3434 let Ok(data) = hex::decode(data_hex) else {
3435 self.inner.replica_apply_metrics.record(
3436 crate::replication::logical::ApplyErrorKind::Decode,
3437 );
3438 self.persist_replication_health(
3439 "apply_error",
3440 "failed to decode WAL record hex payload",
3441 current_lsn,
3442 oldest_available_lsn,
3443 );
3444 continue;
3445 };
3446 let Ok(change) = ChangeRecord::decode(&data) else {
3447 self.inner.replica_apply_metrics.record(
3448 crate::replication::logical::ApplyErrorKind::Decode,
3449 );
3450 self.persist_replication_health(
3451 "apply_error",
3452 "failed to decode logical WAL record",
3453 current_lsn,
3454 oldest_available_lsn,
3455 );
3456 continue;
3457 };
3458 match applier.apply(
3459 self.inner.db.as_ref(),
3460 &change,
3461 ApplyMode::Replica,
3462 ) {
3463 Ok(crate::replication::logical::ApplyOutcome::Applied) => {
3464 since_lsn = since_lsn.max(change.lsn);
3465 self.persist_replica_lsn(since_lsn);
3466 }
3467 Ok(_) => {
3468 }
3470 Err(err) => {
3471 self.inner.replica_apply_metrics.record(err.kind());
3472 match &err {
3481 crate::replication::logical::LogicalApplyError::Divergence { lsn, expected: _, got: _ } => {
3482 crate::telemetry::operator_event::OperatorEvent::Divergence {
3483 peer: "primary".to_string(),
3484 leader_lsn: *lsn,
3485 follower_lsn: since_lsn,
3486 }
3487 .emit_global();
3488 }
3489 crate::replication::logical::LogicalApplyError::Gap { last, next } => {
3490 crate::telemetry::operator_event::OperatorEvent::ReplicationBroken {
3491 peer: "primary".to_string(),
3492 reason: format!("stalled gap last={last} next={next}"),
3493 }
3494 .emit_global();
3495 }
3496 _ => {}
3497 }
3498 let kind = match &err {
3499 crate::replication::logical::LogicalApplyError::Gap { .. } => "stalled_gap",
3500 crate::replication::logical::LogicalApplyError::Divergence { .. } => "divergence",
3501 _ => "apply_error",
3502 };
3503 self.persist_replication_health(
3504 kind,
3505 &format!("replica apply rejected: {err}"),
3506 current_lsn,
3507 oldest_available_lsn,
3508 );
3509 break;
3520 }
3521 }
3522 }
3523 }
3524 self.persist_replication_health(
3525 "healthy",
3526 "",
3527 current_lsn,
3528 oldest_available_lsn,
3529 );
3530 } else {
3531 self.persist_replication_health(
3532 "apply_error",
3533 "failed to parse pull_wal_records response",
3534 None,
3535 None,
3536 );
3537 }
3538 } else {
3539 self.persist_replication_health(
3540 "connecting",
3541 "primary pull_wal_records request failed",
3542 None,
3543 None,
3544 );
3545 }
3546
3547 std::thread::sleep(std::time::Duration::from_millis(poll_ms));
3548 }
3549 });
3550 }
3551
3552 pub fn cdc_poll(
3554 &self,
3555 since_lsn: u64,
3556 max_count: usize,
3557 ) -> Vec<crate::replication::cdc::ChangeEvent> {
3558 self.inner.cdc.poll(since_lsn, max_count)
3559 }
3560
3561 pub fn cdc_current_lsn(&self) -> u64 {
3565 self.inner.cdc.current_lsn()
3566 }
3567
3568 pub fn kv_watch_events_since(
3569 &self,
3570 collection: &str,
3571 key: &str,
3572 since_lsn: u64,
3573 max_count: usize,
3574 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
3575 self.inner
3576 .cdc
3577 .poll(since_lsn, max_count)
3578 .into_iter()
3579 .filter_map(|event| event.kv)
3580 .filter(|event| event.collection == collection && event.key == key)
3581 .collect()
3582 }
3583
3584 pub fn kv_watch_events_since_prefix(
3585 &self,
3586 collection: &str,
3587 prefix: &str,
3588 since_lsn: u64,
3589 max_count: usize,
3590 ) -> Vec<crate::replication::cdc::KvWatchEvent> {
3591 self.inner
3592 .cdc
3593 .poll(since_lsn, max_count)
3594 .into_iter()
3595 .filter_map(|event| event.kv)
3596 .filter(|event| event.collection == collection && event.key.starts_with(prefix))
3597 .collect()
3598 }
3599
3600 pub(crate) fn kv_watch_subscribe<'a>(
3601 &'a self,
3602 collection: impl Into<String>,
3603 key: impl Into<String>,
3604 from_lsn: Option<u64>,
3605 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
3606 crate::runtime::kv_watch::KvWatchStream::subscribe(
3607 &self.inner.cdc,
3608 &self.inner.kv_stats,
3609 collection,
3610 key,
3611 from_lsn,
3612 self.kv_watch_idle_timeout_ms(),
3613 )
3614 }
3615
3616 pub(crate) fn kv_watch_subscribe_prefix<'a>(
3617 &'a self,
3618 collection: impl Into<String>,
3619 prefix: impl Into<String>,
3620 from_lsn: Option<u64>,
3621 ) -> crate::runtime::kv_watch::KvWatchStream<'a> {
3622 crate::runtime::kv_watch::KvWatchStream::subscribe_prefix(
3623 &self.inner.cdc,
3624 &self.inner.kv_stats,
3625 collection,
3626 prefix,
3627 from_lsn,
3628 self.kv_watch_idle_timeout_ms(),
3629 )
3630 }
3631
3632 pub(crate) fn kv_watch_idle_timeout_ms(&self) -> u64 {
3633 self.config_u64("red.config.kv.watch.idle_timeout_ms", 60_000)
3634 }
3635
3636 pub fn backup_status(&self) -> crate::replication::scheduler::BackupStatus {
3638 self.inner.backup_scheduler.status()
3639 }
3640
3641 pub fn result_blob_cache(&self) -> &crate::storage::cache::BlobCache {
3651 &self.inner.result_blob_cache
3652 }
3653
3654 pub fn primary_replica_snapshots(&self) -> Vec<crate::replication::primary::ReplicaState> {
3658 self.inner
3659 .db
3660 .replication
3661 .as_ref()
3662 .map(|repl| repl.replica_snapshots())
3663 .unwrap_or_default()
3664 }
3665
3666 pub fn commit_policy(&self) -> crate::replication::CommitPolicy {
3671 crate::replication::CommitPolicy::from_env()
3672 }
3673
3674 pub fn replica_apply_error_counts(
3679 &self,
3680 ) -> [(crate::replication::logical::ApplyErrorKind, u64); 4] {
3681 self.inner.replica_apply_metrics.snapshot()
3682 }
3683
3684 pub fn quota_bucket(&self) -> &crate::runtime::quota_bucket::QuotaBucket {
3687 &self.inner.quota_bucket
3688 }
3689
3690 pub fn commit_waiter_snapshot(&self) -> Vec<(String, u64)> {
3694 self.inner
3695 .db
3696 .replication
3697 .as_ref()
3698 .map(|repl| repl.commit_waiter.snapshot())
3699 .unwrap_or_default()
3700 }
3701
3702 pub fn commit_waiter_metrics_snapshot(&self) -> (u64, u64, u64, u64) {
3705 self.inner
3706 .db
3707 .replication
3708 .as_ref()
3709 .map(|repl| repl.commit_waiter.metrics_snapshot())
3710 .unwrap_or((0, 0, 0, 0))
3711 }
3712
3713 pub fn await_replica_acks(
3723 &self,
3724 target_lsn: u64,
3725 count: u32,
3726 timeout: std::time::Duration,
3727 ) -> crate::replication::AwaitOutcome {
3728 match &self.inner.db.replication {
3729 Some(repl) => repl.commit_waiter.await_acks(target_lsn, count, timeout),
3730 None => {
3731 crate::replication::AwaitOutcome::NotRequired
3735 }
3736 }
3737 }
3738
3739 pub fn enforce_commit_policy(
3753 &self,
3754 post_lsn: u64,
3755 ) -> RedDBResult<crate::replication::AwaitOutcome> {
3756 let n = match self.commit_policy() {
3757 crate::replication::CommitPolicy::AckN(n) if n > 0 => n,
3758 _ => return Ok(crate::replication::AwaitOutcome::NotRequired),
3759 };
3760 let timeout_ms = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
3761 .ok()
3762 .and_then(|v| v.parse::<u64>().ok())
3763 .unwrap_or(5_000);
3764 let outcome =
3765 self.await_replica_acks(post_lsn, n, std::time::Duration::from_millis(timeout_ms));
3766 if let crate::replication::AwaitOutcome::TimedOut { observed, required } = &outcome {
3767 tracing::warn!(
3768 target: "reddb::commit",
3769 post_lsn,
3770 observed = *observed,
3771 required = *required,
3772 timeout_ms,
3773 "ack_n: timed out waiting for replicas"
3774 );
3775 let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
3776 .ok()
3777 .map(|v| {
3778 let t = v.trim();
3779 t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
3780 })
3781 .unwrap_or(false);
3782 if fail {
3783 return Err(RedDBError::ReadOnly(format!(
3784 "commit policy timed out at lsn {post_lsn}: observed={observed} required={required} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
3785 )));
3786 }
3787 }
3788 Ok(outcome)
3789 }
3790
3791 pub fn encryption_at_rest_status(&self) -> (&'static str, Option<String>) {
3799 match crate::crypto::page_encryption::key_from_env() {
3800 Ok(Some(_)) => ("enabled", None),
3801 Ok(None) => ("disabled", None),
3802 Err(err) => ("error", Some(err)),
3803 }
3804 }
3805
3806 pub fn replica_apply_health(&self) -> Option<String> {
3812 let state = self.config_string("red.replication.state", "");
3813 if state.is_empty() {
3814 None
3815 } else {
3816 Some(state)
3817 }
3818 }
3819
3820 pub fn wal_archive_progress(&self) -> (u64, u64) {
3825 let current_lsn = self
3826 .inner
3827 .db
3828 .replication
3829 .as_ref()
3830 .map(|repl| {
3831 repl.logical_wal_spool
3832 .as_ref()
3833 .map(|spool| spool.current_lsn())
3834 .unwrap_or_else(|| repl.wal_buffer.current_lsn())
3835 })
3836 .unwrap_or_else(|| self.inner.cdc.current_lsn());
3837 let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
3838 (current_lsn, last_archived_lsn)
3839 }
3840
3841 pub fn trigger_backup(&self) -> RedDBResult<crate::replication::scheduler::BackupResult> {
3843 self.check_write(crate::runtime::write_gate::WriteKind::Backup)?;
3844 self.assert_remote_write_allowed("admin/backup")?;
3849 let started = std::time::Instant::now();
3850 let snapshot = self.create_snapshot()?;
3851 let mut uploaded = false;
3852
3853 if let (Some(backend), Some(path)) = (&self.inner.db.remote_backend, self.inner.db.path()) {
3854 let default_snapshot_prefix = self.inner.db.options().default_snapshot_prefix();
3855 let default_wal_prefix = self.inner.db.options().default_wal_archive_prefix();
3856 let default_head_key = self.inner.db.options().default_backup_head_key();
3857 let snapshot_prefix = self.config_string(
3858 "red.config.backup.snapshot_prefix",
3859 &default_snapshot_prefix,
3860 );
3861 let wal_prefix =
3862 self.config_string("red.config.wal.archive.prefix", &default_wal_prefix);
3863 let head_key = self.config_string("red.config.backup.head_key", &default_head_key);
3864 let timeline_id = self.config_string("red.config.timeline.id", "main");
3865 let snapshot_key = crate::storage::wal::archive_snapshot(
3866 backend.as_ref(),
3867 path,
3868 snapshot.snapshot_id,
3869 &snapshot_prefix,
3870 )
3871 .map_err(|err| RedDBError::Internal(err.to_string()))?;
3872 let current_lsn = self
3873 .inner
3874 .db
3875 .replication
3876 .as_ref()
3877 .map(|repl| {
3878 repl.logical_wal_spool
3879 .as_ref()
3880 .map(|spool| spool.current_lsn())
3881 .unwrap_or_else(|| repl.wal_buffer.current_lsn())
3882 })
3883 .unwrap_or_else(|| self.inner.cdc.current_lsn());
3884 let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
3885 let snapshot_sha256 =
3891 crate::storage::wal::SnapshotManifest::compute_snapshot_sha256(path)
3892 .map_err(|err| {
3893 tracing::warn!(
3894 target: "reddb::backup",
3895 error = %err,
3896 snapshot_id = snapshot.snapshot_id,
3897 "snapshot hash failed; manifest will lack checksum"
3898 );
3899 })
3900 .ok();
3901 let manifest = crate::storage::wal::SnapshotManifest {
3902 timeline_id: timeline_id.clone(),
3903 snapshot_key: snapshot_key.clone(),
3904 snapshot_id: snapshot.snapshot_id,
3905 snapshot_time: snapshot.created_at_unix_ms as u64,
3906 base_lsn: current_lsn,
3907 schema_version: crate::api::REDDB_FORMAT_VERSION,
3908 format_version: crate::api::REDDB_FORMAT_VERSION,
3909 snapshot_sha256,
3910 };
3911 crate::storage::wal::publish_snapshot_manifest(backend.as_ref(), &manifest)
3912 .map_err(|err| RedDBError::Internal(err.to_string()))?;
3913
3914 let prev_segment_hash = self.config_string("red.config.timeline.last_segment_hash", "");
3921 let prev_hash_arg = if prev_segment_hash.is_empty() {
3922 None
3923 } else {
3924 Some(prev_segment_hash)
3925 };
3926
3927 let archived_lsn = if let Some(primary) = &self.inner.db.replication {
3928 let oldest = primary
3929 .logical_wal_spool
3930 .as_ref()
3931 .and_then(|spool| spool.oldest_lsn().ok().flatten())
3932 .or_else(|| primary.wal_buffer.oldest_lsn())
3933 .unwrap_or(last_archived_lsn);
3934 if last_archived_lsn > 0 && last_archived_lsn < oldest.saturating_sub(1) {
3935 return Err(RedDBError::Internal(format!(
3936 "logical WAL gap detected: last_archived_lsn={last_archived_lsn}, oldest_available_lsn={oldest}"
3937 )));
3938 }
3939 let records = if let Some(spool) = &primary.logical_wal_spool {
3940 spool
3941 .read_since(last_archived_lsn, usize::MAX)
3942 .map_err(|err| RedDBError::Internal(err.to_string()))?
3943 } else {
3944 primary.wal_buffer.read_since(last_archived_lsn, usize::MAX)
3945 };
3946 if let Some(meta) = crate::storage::wal::archive_change_records(
3947 backend.as_ref(),
3948 &wal_prefix,
3949 &records,
3950 prev_hash_arg,
3951 )
3952 .map_err(|err| RedDBError::Internal(err.to_string()))?
3953 {
3954 if let Some(spool) = &primary.logical_wal_spool {
3955 let _ = spool.prune_through(meta.lsn_end);
3956 }
3957 if let Some(sha) = &meta.sha256 {
3963 self.inner.db.store().set_config_tree(
3964 "red.config.timeline",
3965 &crate::json!({ "last_segment_hash": sha }),
3966 );
3967 }
3968 meta.lsn_end
3969 } else {
3970 last_archived_lsn
3971 }
3972 } else {
3973 last_archived_lsn
3974 };
3975
3976 let head = crate::storage::wal::BackupHead {
3977 timeline_id,
3978 snapshot_key,
3979 snapshot_id: snapshot.snapshot_id,
3980 snapshot_time: snapshot.created_at_unix_ms as u64,
3981 current_lsn,
3982 last_archived_lsn: archived_lsn,
3983 wal_prefix,
3984 };
3985 crate::storage::wal::publish_backup_head(backend.as_ref(), &head_key, &head)
3986 .map_err(|err| RedDBError::Internal(err.to_string()))?;
3987 self.inner.db.store().set_config_tree(
3988 "red.config.timeline",
3989 &crate::json!({
3990 "last_archived_lsn": archived_lsn,
3991 "id": head.timeline_id
3992 }),
3993 );
3994
3995 if let Err(err) = crate::storage::wal::publish_unified_manifest_for_prefix(
4003 backend.as_ref(),
4004 &snapshot_prefix,
4005 ) {
4006 tracing::warn!(
4007 target: "reddb::backup",
4008 error = %err,
4009 snapshot_prefix = %snapshot_prefix,
4010 "unified MANIFEST.json refresh failed; per-artifact sidecars unaffected"
4011 );
4012 }
4013
4014 match self.commit_policy() {
4026 crate::replication::CommitPolicy::AckN(n) if n > 0 => {
4027 let timeout = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
4028 .ok()
4029 .and_then(|v| v.parse::<u64>().ok())
4030 .unwrap_or(5_000);
4031 let outcome = self.await_replica_acks(
4032 archived_lsn,
4033 n,
4034 std::time::Duration::from_millis(timeout),
4035 );
4036 match outcome {
4037 crate::replication::AwaitOutcome::Reached(count) => {
4038 tracing::debug!(
4039 target: "reddb::backup",
4040 archived_lsn,
4041 n,
4042 count,
4043 "ack_n: replicas synced before backup return"
4044 );
4045 }
4046 crate::replication::AwaitOutcome::TimedOut { observed, required } => {
4047 tracing::warn!(
4048 target: "reddb::backup",
4049 archived_lsn,
4050 observed,
4051 required,
4052 timeout_ms = timeout,
4053 "ack_n: timed out waiting for replicas; backup uploaded but DR posture degraded"
4054 );
4055 }
4056 crate::replication::AwaitOutcome::NotRequired => {}
4057 }
4058 }
4059 _ => {} }
4061
4062 if self.config_bool("red.config.backup.include_blob_cache", false) {
4074 let blob_cache_prefix = self.config_string(
4075 "red.config.backup.blob_cache_prefix",
4076 &format!("{snapshot_prefix}blob_cache/"),
4077 );
4078 if let Some(l2_path) = self.inner.result_blob_cache.l2_path() {
4079 match crate::storage::cache::archive_blob_cache_l2(
4080 backend.as_ref(),
4081 l2_path,
4082 &blob_cache_prefix,
4083 ) {
4084 Ok(count) => {
4085 tracing::info!(
4086 target: "reddb::backup",
4087 files_uploaded = count,
4088 blob_cache_prefix = %blob_cache_prefix,
4089 "include_blob_cache: archived L2 directory"
4090 );
4091 }
4092 Err(err) => {
4093 tracing::warn!(
4094 target: "reddb::backup",
4095 error = %err,
4096 blob_cache_prefix = %blob_cache_prefix,
4097 "include_blob_cache: L2 archive failed; backup proceeding (cache is derived state)"
4098 );
4099 }
4100 }
4101 } else {
4102 tracing::debug!(
4103 target: "reddb::backup",
4104 "include_blob_cache=true but no L2 path configured; nothing to archive"
4105 );
4106 }
4107 }
4108
4109 uploaded = true;
4110 }
4111
4112 Ok(crate::replication::scheduler::BackupResult {
4113 snapshot_id: snapshot.snapshot_id,
4114 uploaded,
4115 duration_ms: started.elapsed().as_millis() as u64,
4116 timestamp: snapshot.created_at_unix_ms as u64,
4117 })
4118 }
4119
4120 pub fn acquire(&self) -> RedDBResult<RuntimeConnection> {
4121 let mut pool = self
4122 .inner
4123 .pool
4124 .lock()
4125 .map_err(|e| RedDBError::Internal(format!("connection pool lock poisoned: {e}")))?;
4126 if pool.active >= self.inner.pool_config.max_connections {
4127 return Err(RedDBError::Internal(
4128 "connection pool exhausted".to_string(),
4129 ));
4130 }
4131
4132 let id = if let Some(id) = pool.idle.pop() {
4133 id
4134 } else {
4135 let id = pool.next_id;
4136 pool.next_id += 1;
4137 id
4138 };
4139 pool.active += 1;
4140 pool.total_checkouts += 1;
4141 drop(pool);
4142
4143 Ok(RuntimeConnection {
4144 id,
4145 inner: Arc::clone(&self.inner),
4146 })
4147 }
4148
4149 pub fn checkpoint(&self) -> RedDBResult<()> {
4150 self.inner.db.flush_local_only().map_err(|err| {
4155 let msg = err.to_string();
4160 crate::telemetry::operator_event::OperatorEvent::CheckpointFailed {
4161 lsn: 0,
4162 error: msg.clone(),
4163 }
4164 .emit_global();
4165 crate::telemetry::operator_event::OperatorEvent::WalFsyncFailed {
4166 path: "<flush_local_only>".to_string(),
4167 error: msg.clone(),
4168 }
4169 .emit_global();
4170 RedDBError::Engine(msg)
4171 })?;
4172 if let Err(err) = self.assert_remote_write_allowed("checkpoint") {
4173 tracing::warn!(
4174 target: "reddb::serverless::lease",
4175 error = %err,
4176 "checkpoint: skipping remote upload — lease not held"
4177 );
4178 return Ok(());
4179 }
4180 self.inner
4181 .db
4182 .upload_to_remote_backend()
4183 .map_err(|err| RedDBError::Engine(err.to_string()))
4184 }
4185
4186 pub(crate) fn assert_remote_write_allowed(&self, action: &str) -> RedDBResult<()> {
4193 if self.inner.db.remote_backend.is_none() {
4194 return Ok(());
4195 }
4196 match self.inner.write_gate.lease_state() {
4197 crate::runtime::write_gate::LeaseGateState::NotHeld => {
4198 self.inner.audit_log.record(
4199 action,
4200 "system",
4201 "remote_backend",
4202 "err: writer lease not held",
4203 crate::json::Value::Null,
4204 );
4205 Err(RedDBError::ReadOnly(format!(
4206 "writer lease not held — {action} blocked (serverless fence)"
4207 )))
4208 }
4209 _ => Ok(()),
4210 }
4211 }
4212
4213 pub fn run_maintenance(&self) -> RedDBResult<()> {
4214 self.inner
4215 .db
4216 .run_maintenance()
4217 .map_err(|err| RedDBError::Internal(err.to_string()))
4218 }
4219
4220 pub fn scan_collection(
4221 &self,
4222 collection: &str,
4223 cursor: Option<ScanCursor>,
4224 limit: usize,
4225 ) -> RedDBResult<ScanPage> {
4226 let store = self.inner.db.store();
4227 let manager = store
4228 .get_collection(collection)
4229 .ok_or_else(|| RedDBError::NotFound(collection.to_string()))?;
4230
4231 let mut entities = manager.query_all(|_| true);
4232 entities.sort_by_key(|entity| entity.id.raw());
4233
4234 let offset = cursor.map(|cursor| cursor.offset).unwrap_or(0);
4235 let total = entities.len();
4236 let end = total.min(offset.saturating_add(limit.max(1)));
4237 let items = if offset >= total {
4238 Vec::new()
4239 } else {
4240 entities[offset..end].to_vec()
4241 };
4242 let next = (end < total).then_some(ScanCursor { offset: end });
4243
4244 Ok(ScanPage {
4245 collection: collection.to_string(),
4246 items,
4247 next,
4248 total,
4249 })
4250 }
4251
4252 pub fn catalog(&self) -> CatalogModelSnapshot {
4253 self.inner.db.catalog_model_snapshot()
4254 }
4255
4256 pub fn catalog_consistency_report(&self) -> crate::catalog::CatalogConsistencyReport {
4257 self.inner.db.catalog_consistency_report()
4258 }
4259
4260 pub fn catalog_attention_summary(&self) -> CatalogAttentionSummary {
4261 crate::catalog::attention_summary(&self.catalog())
4262 }
4263
4264 pub fn collection_attention(&self) -> Vec<CollectionDescriptor> {
4265 crate::catalog::collection_attention(&self.catalog())
4266 }
4267
4268 pub fn index_attention(&self) -> Vec<CatalogIndexStatus> {
4269 crate::catalog::index_attention(&self.catalog())
4270 }
4271
4272 pub fn graph_projection_attention(&self) -> Vec<CatalogGraphProjectionStatus> {
4273 crate::catalog::graph_projection_attention(&self.catalog())
4274 }
4275
4276 pub fn analytics_job_attention(&self) -> Vec<CatalogAnalyticsJobStatus> {
4277 crate::catalog::analytics_job_attention(&self.catalog())
4278 }
4279
4280 pub fn stats(&self) -> RuntimeStats {
4281 let pool = runtime_pool_lock(self);
4282 RuntimeStats {
4283 active_connections: pool.active,
4284 idle_connections: pool.idle.len(),
4285 total_checkouts: pool.total_checkouts,
4286 paged_mode: self.inner.db.is_paged(),
4287 started_at_unix_ms: self.inner.started_at_unix_ms,
4288 store: self.inner.db.stats(),
4289 system: SystemInfo::collect(),
4290 result_blob_cache: self.inner.result_blob_cache.stats(),
4291 kv: self.inner.kv_stats.snapshot(),
4292 }
4293 }
4294
4295 pub fn execute_query_with_scope(
4309 &self,
4310 query: &str,
4311 scope: crate::runtime::within_clause::ScopeOverride,
4312 ) -> RedDBResult<RuntimeQueryResult> {
4313 if scope.is_empty() {
4314 return self.execute_query(query);
4315 }
4316 let _scope_guard = ScopeOverrideGuard::install(scope);
4317 self.execute_query(query)
4318 }
4319
4320 pub fn execute_query(&self, query: &str) -> RedDBResult<RuntimeQueryResult> {
4329 let started = std::time::Instant::now();
4330 let result = self.execute_query_inner(query);
4331 let elapsed_ms = started.elapsed().as_millis() as u64;
4332
4333 let scope = self.ai_scope();
4338 let kind = match result
4339 .as_ref()
4340 .map(|r| r.statement_type)
4341 .unwrap_or("select")
4342 {
4343 "select" => crate::telemetry::slow_query_logger::QueryKind::Select,
4344 "insert" => crate::telemetry::slow_query_logger::QueryKind::Insert,
4345 "update" => crate::telemetry::slow_query_logger::QueryKind::Update,
4346 "delete" => crate::telemetry::slow_query_logger::QueryKind::Delete,
4347 _ => crate::telemetry::slow_query_logger::QueryKind::Internal,
4348 };
4349 self.inner
4355 .slow_query_logger
4356 .record(kind, elapsed_ms, query.to_string(), &scope);
4357
4358 result
4359 }
4360
4361 #[inline(never)]
4362 fn execute_query_inner(&self, query: &str) -> RedDBResult<RuntimeQueryResult> {
4363 if !has_scope_override_active()
4374 && !query.trim_start().starts_with("WITHIN")
4375 && !query.trim_start().starts_with("within")
4376 && !self
4377 .inner
4378 .tx_contexts
4379 .read()
4380 .contains_key(¤t_connection_id())
4381 {
4382 if let Some(result) = self.try_fast_entity_lookup(query) {
4383 return result;
4384 }
4385 }
4386
4387 match crate::runtime::within_clause::try_strip_within_prefix(query) {
4394 Ok(Some((scope, inner))) => {
4395 let _scope_guard = ScopeOverrideGuard::install(scope);
4396 return self.execute_query_inner(inner);
4401 }
4402 Ok(None) => {}
4403 Err(msg) => return Err(RedDBError::Query(msg)),
4404 }
4405
4406 if let Some(inner) = strip_explain_prefix(query) {
4413 return self.explain_as_rows(query, inner);
4414 }
4415
4416 if let Some(value) = parse_set_local_tenant(query)? {
4421 let conn_id = current_connection_id();
4422 if !self.inner.tx_contexts.read().contains_key(&conn_id) {
4423 return Err(RedDBError::Query(
4424 "SET LOCAL TENANT requires an active transaction".to_string(),
4425 ));
4426 }
4427 self.inner
4428 .tx_local_tenants
4429 .write()
4430 .insert(conn_id, value.clone());
4431 return Ok(RuntimeQueryResult::ok_message(
4432 query.to_string(),
4433 &match &value {
4434 Some(id) => format!("local tenant set: {id}"),
4435 None => "local tenant cleared".to_string(),
4436 },
4437 "set_local_tenant",
4438 ));
4439 }
4440
4441 if super::red_schema::is_system_schema_write(query) {
4442 return Err(RedDBError::Query(
4443 super::red_schema::READ_ONLY_ERROR.to_string(),
4444 ));
4445 }
4446
4447 let rewritten_query = super::red_schema::rewrite_virtual_names(query);
4448 let execution_query = rewritten_query.as_deref().unwrap_or(query);
4449
4450 let frame = super::statement_frame::StatementExecutionFrame::build(self, execution_query)?;
4451 let _frame_guards = frame.install(self);
4452
4453 let _log_span = crate::telemetry::span::query_span(query).entered();
4460
4461 if let Some(rewritten) = frame.prepare_cte(execution_query)? {
4463 return self.execute_query_expr(rewritten);
4464 }
4465
4466 if let Some(result) = self.try_fast_entity_lookup(execution_query) {
4468 return result;
4469 }
4470
4471 if let Some(result) = frame.read_result_cache(self) {
4473 return Ok(result);
4474 }
4475
4476 let prepared = frame.prepare_statement(self, execution_query)?;
4477 let mode = prepared.mode;
4478 let expr = prepared.expr;
4479
4480 let statement = query_expr_name(&expr);
4481 let result_cache_scopes = query_expr_result_cache_scopes(&expr);
4482
4483 let _lock_guard = frame.prepare_dispatch(self, &expr)?;
4484 let frame_iface: &dyn super::statement_frame::ReadFrame = &frame;
4485
4486 let query_result = match expr {
4487 QueryExpr::Graph(_) | QueryExpr::Path(_) => {
4488 let (graph, node_properties) = self.materialize_graph_with_rls()?;
4496 let result =
4497 crate::storage::query::unified::UnifiedExecutor::execute_on_with_node_properties(
4498 &graph,
4499 &expr,
4500 node_properties,
4501 )
4502 .map_err(|err| RedDBError::Query(err.to_string()))?;
4503
4504 Ok(RuntimeQueryResult {
4505 query: query.to_string(),
4506 mode,
4507 statement,
4508 engine: "materialized-graph",
4509 result,
4510 affected_rows: 0,
4511 statement_type: "select",
4512 })
4513 }
4514 QueryExpr::Table(table) => {
4515 if super::red_schema::is_virtual_table(&table.table) {
4516 return Ok(RuntimeQueryResult {
4517 query: query.to_string(),
4518 mode,
4519 statement,
4520 engine: "runtime-red-schema",
4521 result: super::red_schema::red_query(
4522 self,
4523 &table.table,
4524 &table,
4525 &frame as &dyn super::statement_frame::ReadFrame,
4526 )?,
4527 affected_rows: 0,
4528 statement_type: "select",
4529 });
4530 }
4531
4532 if self.inner.foreign_tables.is_foreign_table(&table.table) {
4540 let records = self
4541 .inner
4542 .foreign_tables
4543 .scan(&table.table)
4544 .map_err(|e| RedDBError::Internal(e.to_string()))?;
4545 let result = apply_foreign_table_filters(records, &table);
4546 return Ok(RuntimeQueryResult {
4547 query: query.to_string(),
4548 mode,
4549 statement,
4550 engine: "runtime-fdw",
4551 result,
4552 affected_rows: 0,
4553 statement_type: "select",
4554 });
4555 }
4556
4557 let Some(table_with_rls) = self.authorize_relational_table_select(
4574 table,
4575 &frame as &dyn super::statement_frame::ReadFrame,
4576 )?
4577 else {
4578 let empty = crate::storage::query::unified::UnifiedResult::empty();
4579 return Ok(RuntimeQueryResult {
4580 query: query.to_string(),
4581 mode,
4582 statement,
4583 engine: "runtime-table-rls",
4584 result: empty,
4585 affected_rows: 0,
4586 statement_type: "select",
4587 });
4588 };
4589 Ok(RuntimeQueryResult {
4590 query: query.to_string(),
4591 mode,
4592 statement,
4593 engine: "runtime-table",
4594 result: execute_runtime_table_query(
4595 &self.inner.db,
4596 &table_with_rls,
4597 Some(&self.inner.index_store),
4598 )?,
4599 affected_rows: 0,
4600 statement_type: "select",
4601 })
4602 }
4603 QueryExpr::Join(join) => {
4604 let join_with_rls = match self.authorize_relational_join_select(
4613 join,
4614 &frame as &dyn super::statement_frame::ReadFrame,
4615 )? {
4616 Some(j) => j,
4617 None => {
4618 return Ok(RuntimeQueryResult {
4619 query: query.to_string(),
4620 mode,
4621 statement,
4622 engine: "runtime-join-rls",
4623 result: crate::storage::query::unified::UnifiedResult::empty(),
4624 affected_rows: 0,
4625 statement_type: "select",
4626 });
4627 }
4628 };
4629 Ok(RuntimeQueryResult {
4630 query: query.to_string(),
4631 mode,
4632 statement,
4633 engine: "runtime-join",
4634 result: execute_runtime_join_query(&self.inner.db, &join_with_rls)?,
4635 affected_rows: 0,
4636 statement_type: "select",
4637 })
4638 }
4639 QueryExpr::Vector(vector) => Ok(RuntimeQueryResult {
4640 query: query.to_string(),
4641 mode,
4642 statement,
4643 engine: "runtime-vector",
4644 result: execute_runtime_vector_query(&self.inner.db, &vector)?,
4645 affected_rows: 0,
4646 statement_type: "select",
4647 }),
4648 QueryExpr::Hybrid(hybrid) => Ok(RuntimeQueryResult {
4649 query: query.to_string(),
4650 mode,
4651 statement,
4652 engine: "runtime-hybrid",
4653 result: execute_runtime_hybrid_query(&self.inner.db, &hybrid)?,
4654 affected_rows: 0,
4655 statement_type: "select",
4656 }),
4657 QueryExpr::Insert(ref insert) if super::red_schema::is_virtual_table(&insert.table) => {
4659 Err(RedDBError::Query(
4660 super::red_schema::READ_ONLY_ERROR.to_string(),
4661 ))
4662 }
4663 QueryExpr::Update(ref update) if super::red_schema::is_virtual_table(&update.table) => {
4664 Err(RedDBError::Query(
4665 super::red_schema::READ_ONLY_ERROR.to_string(),
4666 ))
4667 }
4668 QueryExpr::Delete(ref delete) if super::red_schema::is_virtual_table(&delete.table) => {
4669 Err(RedDBError::Query(
4670 super::red_schema::READ_ONLY_ERROR.to_string(),
4671 ))
4672 }
4673 QueryExpr::Insert(ref insert) => self.execute_insert(query, insert),
4674 QueryExpr::Update(ref update) => self.execute_update(query, update),
4675 QueryExpr::Delete(ref delete) => self.execute_delete(query, delete),
4676 QueryExpr::CreateTable(ref create) => self.execute_create_table(query, create),
4678 QueryExpr::DropTable(ref drop_tbl) => self.execute_drop_table(query, drop_tbl),
4679 QueryExpr::DropGraph(ref drop_graph) => self.execute_drop_graph(query, drop_graph),
4680 QueryExpr::DropVector(ref drop_vector) => self.execute_drop_vector(query, drop_vector),
4681 QueryExpr::DropDocument(ref drop_document) => {
4682 self.execute_drop_document(query, drop_document)
4683 }
4684 QueryExpr::DropKv(ref drop_kv) => self.execute_drop_kv(query, drop_kv),
4685 QueryExpr::DropCollection(ref drop_collection) => {
4686 self.execute_drop_collection(query, drop_collection)
4687 }
4688 QueryExpr::Truncate(ref truncate) => self.execute_truncate(query, truncate),
4689 QueryExpr::AlterTable(ref alter) => self.execute_alter_table(query, alter),
4690 QueryExpr::ExplainAlter(ref explain) => self.execute_explain_alter(query, explain),
4691 QueryExpr::GraphCommand(ref cmd) => self.execute_graph_command(query, cmd),
4693 QueryExpr::SearchCommand(ref cmd) => self.execute_search_command(query, cmd),
4695 QueryExpr::Ask(ref ask) => self.execute_ask(query, ask),
4697 QueryExpr::CreateIndex(ref create_idx) => self.execute_create_index(query, create_idx),
4698 QueryExpr::DropIndex(ref drop_idx) => self.execute_drop_index(query, drop_idx),
4699 QueryExpr::ProbabilisticCommand(ref cmd) => {
4700 self.execute_probabilistic_command(query, cmd)
4701 }
4702 QueryExpr::CreateTimeSeries(ref ts) => self.execute_create_timeseries(query, ts),
4704 QueryExpr::DropTimeSeries(ref ts) => self.execute_drop_timeseries(query, ts),
4705 QueryExpr::CreateQueue(ref q) => self.execute_create_queue(query, q),
4707 QueryExpr::AlterQueue(ref q) => self.execute_alter_queue(query, q),
4708 QueryExpr::DropQueue(ref q) => self.execute_drop_queue(query, q),
4709 QueryExpr::QueueSelect(ref q) => self.execute_queue_select(query, q),
4710 QueryExpr::QueueCommand(ref cmd) => self.execute_queue_command(query, cmd),
4711 QueryExpr::EventsBackfill(ref backfill) => {
4712 self.execute_events_backfill(query, backfill)
4713 }
4714 QueryExpr::EventsBackfillStatus { ref collection } => Err(RedDBError::Query(format!(
4715 "EVENTS BACKFILL STATUS for '{collection}' is not implemented in this slice"
4716 ))),
4717 QueryExpr::KvCommand(ref cmd) => self.execute_kv_command(query, cmd),
4718 QueryExpr::ConfigCommand(ref cmd) => self.execute_config_command(query, cmd),
4719 QueryExpr::CreateTree(ref tree) => self.execute_create_tree(query, tree),
4720 QueryExpr::DropTree(ref tree) => self.execute_drop_tree(query, tree),
4721 QueryExpr::TreeCommand(ref cmd) => self.execute_tree_command(query, cmd),
4722 QueryExpr::SetConfig { ref key, ref value } => {
4724 if key.starts_with("red.secret.") {
4725 return Err(RedDBError::Query(
4726 "red.secret.* is reserved for vault secrets; use SET SECRET".to_string(),
4727 ));
4728 }
4729 let store = self.inner.db.store();
4730 let json_val = match value {
4731 Value::Text(s) => crate::serde_json::Value::String(s.to_string()),
4732 Value::Integer(n) => crate::serde_json::Value::Number(*n as f64),
4733 Value::Float(n) => crate::serde_json::Value::Number(*n),
4734 Value::Boolean(b) => crate::serde_json::Value::Bool(*b),
4735 _ => crate::serde_json::Value::String(value.to_string()),
4736 };
4737 store.set_config_tree(key, &json_val);
4738 update_current_config_value(key, value.clone());
4739 self.invalidate_result_cache();
4744 Ok(RuntimeQueryResult::ok_message(
4745 query.to_string(),
4746 &format!("config set: {key}"),
4747 "set",
4748 ))
4749 }
4750 QueryExpr::SetSecret { ref key, ref value } => {
4752 if key.starts_with("red.config.") {
4753 return Err(RedDBError::Query(
4754 "red.config.* is reserved for config; use SET CONFIG".to_string(),
4755 ));
4756 }
4757 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
4758 RedDBError::Query("SET SECRET requires an enabled, unsealed vault".to_string())
4759 })?;
4760 if matches!(value, Value::Null) {
4761 auth_store
4762 .vault_kv_try_delete(key)
4763 .map_err(|err| RedDBError::Query(err.to_string()))?;
4764 update_current_secret_value(key, None);
4765 self.invalidate_result_cache();
4766 return Ok(RuntimeQueryResult::ok_message(
4767 query.to_string(),
4768 &format!("secret deleted: {key}"),
4769 "delete_secret",
4770 ));
4771 }
4772 let value = secret_sql_value_to_string(value)?;
4773 auth_store
4774 .vault_kv_try_set(key.clone(), value.clone())
4775 .map_err(|err| RedDBError::Query(err.to_string()))?;
4776 update_current_secret_value(key, Some(value));
4777 self.invalidate_result_cache();
4778 Ok(RuntimeQueryResult::ok_message(
4779 query.to_string(),
4780 &format!("secret set: {key}"),
4781 "set_secret",
4782 ))
4783 }
4784 QueryExpr::DeleteSecret { ref key } => {
4786 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
4787 RedDBError::Query(
4788 "DELETE SECRET requires an enabled, unsealed vault".to_string(),
4789 )
4790 })?;
4791 let deleted = auth_store
4792 .vault_kv_try_delete(key)
4793 .map_err(|err| RedDBError::Query(err.to_string()))?;
4794 if deleted {
4795 update_current_secret_value(key, None);
4796 }
4797 self.invalidate_result_cache();
4798 Ok(RuntimeQueryResult::ok_message(
4799 query.to_string(),
4800 &format!("secret deleted: {key}"),
4801 if deleted {
4802 "delete_secret"
4803 } else {
4804 "delete_secret_not_found"
4805 },
4806 ))
4807 }
4808 QueryExpr::ShowSecrets { ref prefix } => {
4810 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
4811 RedDBError::Query("SHOW SECRET requires an enabled, unsealed vault".to_string())
4812 })?;
4813 if !auth_store.is_vault_backed() {
4814 return Err(RedDBError::Query(
4815 "SHOW SECRET requires an enabled, unsealed vault".to_string(),
4816 ));
4817 }
4818 let mut keys = auth_store.vault_kv_keys();
4819 keys.sort();
4820 let mut result = UnifiedResult::with_columns(vec![
4821 "key".into(),
4822 "value".into(),
4823 "status".into(),
4824 ]);
4825 for key in keys {
4826 if let Some(ref pfx) = prefix {
4827 if !key.starts_with(pfx) {
4828 continue;
4829 }
4830 }
4831 let mut record = UnifiedRecord::new();
4832 record.set("key", Value::text(key));
4833 record.set("value", Value::text("***"));
4834 record.set("status", Value::text("active"));
4835 result.push(record);
4836 }
4837 Ok(RuntimeQueryResult {
4838 query: query.to_string(),
4839 mode,
4840 statement: "show_secrets",
4841 engine: "runtime-secret",
4842 result,
4843 affected_rows: 0,
4844 statement_type: "select",
4845 })
4846 }
4847 QueryExpr::ShowConfig { ref prefix } => {
4849 let store = self.inner.db.store();
4850 let all_collections = store.list_collections();
4851 if !all_collections.contains(&"red_config".to_string()) {
4852 let result = UnifiedResult::with_columns(vec!["key".into(), "value".into()]);
4853 return Ok(RuntimeQueryResult {
4854 query: query.to_string(),
4855 mode,
4856 statement: "show_config",
4857 engine: "runtime-config",
4858 result,
4859 affected_rows: 0,
4860 statement_type: "select",
4861 });
4862 }
4863 let manager = store
4864 .get_collection("red_config")
4865 .ok_or_else(|| RedDBError::NotFound("red_config".to_string()))?;
4866 let entities = manager.query_all(|_| true);
4867 let mut latest = std::collections::BTreeMap::<String, (u64, Value, Value)>::new();
4868 for entity in entities {
4869 if let EntityData::Row(ref row) = entity.data {
4870 if let Some(ref named) = row.named {
4871 let key_val = named.get("key").cloned().unwrap_or(Value::Null);
4872 let val = named.get("value").cloned().unwrap_or(Value::Null);
4873 let key_str = match &key_val {
4874 Value::Text(s) => s.as_ref(),
4875 _ => continue,
4876 };
4877 if let Some(ref pfx) = prefix {
4878 if !key_str.starts_with(pfx.as_str()) {
4879 continue;
4880 }
4881 }
4882 let entity_id = entity.id.raw();
4883 match latest.get(key_str) {
4884 Some((prev_id, _, _)) if *prev_id > entity_id => {}
4885 _ => {
4886 latest.insert(key_str.to_string(), (entity_id, key_val, val));
4887 }
4888 }
4889 }
4890 }
4891 }
4892 let mut result = UnifiedResult::with_columns(vec!["key".into(), "value".into()]);
4893 for (_, key_val, val) in latest.into_values() {
4894 let mut record = UnifiedRecord::new();
4895 record.set("key", key_val);
4896 record.set("value", val);
4897 result.push(record);
4898 }
4899 Ok(RuntimeQueryResult {
4900 query: query.to_string(),
4901 mode,
4902 statement: "show_config",
4903 engine: "runtime-config",
4904 result,
4905 affected_rows: 0,
4906 statement_type: "select",
4907 })
4908 }
4909 QueryExpr::SetTenant(ref value) => {
4915 match value {
4916 Some(id) => set_current_tenant(id.clone()),
4917 None => clear_current_tenant(),
4918 }
4919 Ok(RuntimeQueryResult::ok_message(
4920 query.to_string(),
4921 &match value {
4922 Some(id) => format!("tenant set: {id}"),
4923 None => "tenant cleared".to_string(),
4924 },
4925 "set_tenant",
4926 ))
4927 }
4928 QueryExpr::ShowTenant => {
4929 let mut result = UnifiedResult::with_columns(vec!["tenant".into()]);
4930 let mut record = UnifiedRecord::new();
4931 record.set(
4932 "tenant",
4933 current_tenant().map(Value::text).unwrap_or(Value::Null),
4934 );
4935 result.push(record);
4936 Ok(RuntimeQueryResult {
4937 query: query.to_string(),
4938 mode,
4939 statement: "show_tenant",
4940 engine: "runtime-tenant",
4941 result,
4942 affected_rows: 0,
4943 statement_type: "select",
4944 })
4945 }
4946 QueryExpr::TransactionControl(ref ctl) => {
4958 use crate::storage::query::ast::TxnControl;
4959 use crate::storage::transaction::snapshot::{TxnContext, Xid};
4960 use crate::storage::transaction::IsolationLevel;
4961
4962 let conn_id = current_connection_id();
4967
4968 let (kind, msg) = match ctl {
4969 TxnControl::Begin => {
4970 let mgr = Arc::clone(&self.inner.snapshot_manager);
4971 let xid = mgr.begin();
4972 let snapshot = mgr.snapshot(xid);
4973 let ctx = TxnContext {
4974 xid,
4975 isolation: IsolationLevel::SnapshotIsolation,
4976 snapshot,
4977 savepoints: Vec::new(),
4978 released_sub_xids: Vec::new(),
4979 };
4980 self.inner.tx_contexts.write().insert(conn_id, ctx);
4981 ("begin", format!("BEGIN — xid={xid} (snapshot isolation)"))
4982 }
4983 TxnControl::Commit => {
4984 self.inner.tx_local_tenants.write().remove(&conn_id);
4986 let ctx = self.inner.tx_contexts.write().remove(&conn_id);
4987 match ctx {
4988 Some(ctx) => {
4989 for (_, sub) in &ctx.savepoints {
4995 self.inner.snapshot_manager.commit(*sub);
4996 }
4997 for sub in &ctx.released_sub_xids {
4998 self.inner.snapshot_manager.commit(*sub);
4999 }
5000 self.inner.snapshot_manager.commit(ctx.xid);
5001 self.finalize_pending_tombstones(conn_id);
5006 self.finalize_pending_kv_watch_events(conn_id);
5007 ("commit", format!("COMMIT — xid={} committed", ctx.xid))
5008 }
5009 None => (
5010 "commit",
5011 "COMMIT outside transaction — no-op (autocommit)".to_string(),
5012 ),
5013 }
5014 }
5015 TxnControl::Rollback => {
5016 self.inner.tx_local_tenants.write().remove(&conn_id);
5017 let ctx = self.inner.tx_contexts.write().remove(&conn_id);
5018 match ctx {
5019 Some(ctx) => {
5020 for (_, sub) in &ctx.savepoints {
5023 self.inner.snapshot_manager.rollback(*sub);
5024 }
5025 for sub in &ctx.released_sub_xids {
5026 self.inner.snapshot_manager.rollback(*sub);
5027 }
5028 self.inner.snapshot_manager.rollback(ctx.xid);
5029 self.revive_pending_tombstones(conn_id);
5033 self.discard_pending_kv_watch_events(conn_id);
5034 ("rollback", format!("ROLLBACK — xid={} aborted", ctx.xid))
5035 }
5036 None => (
5037 "rollback",
5038 "ROLLBACK outside transaction — no-op (autocommit)".to_string(),
5039 ),
5040 }
5041 }
5042 TxnControl::Savepoint(name) => {
5049 let mgr = Arc::clone(&self.inner.snapshot_manager);
5050 let mut guard = self.inner.tx_contexts.write();
5051 match guard.get_mut(&conn_id) {
5052 Some(ctx) => {
5053 let sub = mgr.begin();
5054 ctx.savepoints.push((name.clone(), sub));
5055 ("savepoint", format!("SAVEPOINT {name} — sub_xid={sub}"))
5056 }
5057 None => (
5058 "savepoint",
5059 "SAVEPOINT outside transaction — no-op".to_string(),
5060 ),
5061 }
5062 }
5063 TxnControl::ReleaseSavepoint(name) => {
5064 let mut guard = self.inner.tx_contexts.write();
5065 match guard.get_mut(&conn_id) {
5066 Some(ctx) => {
5067 let pos = ctx
5068 .savepoints
5069 .iter()
5070 .position(|(n, _)| n == name)
5071 .ok_or_else(|| {
5072 RedDBError::Internal(format!(
5073 "savepoint {name} does not exist"
5074 ))
5075 })?;
5076 let released = ctx.savepoints.len() - pos;
5084 let popped: Vec<Xid> = ctx
5085 .savepoints
5086 .split_off(pos)
5087 .into_iter()
5088 .map(|(_, x)| x)
5089 .collect();
5090 ctx.released_sub_xids.extend(popped);
5091 (
5092 "release_savepoint",
5093 format!("RELEASE SAVEPOINT {name} — {released} level(s)"),
5094 )
5095 }
5096 None => (
5097 "release_savepoint",
5098 "RELEASE outside transaction — no-op".to_string(),
5099 ),
5100 }
5101 }
5102 TxnControl::RollbackToSavepoint(name) => {
5103 let mgr = Arc::clone(&self.inner.snapshot_manager);
5104 let drop_result: Option<(Xid, Vec<Xid>)> = {
5109 let mut guard = self.inner.tx_contexts.write();
5110 if let Some(ctx) = guard.get_mut(&conn_id) {
5111 let pos = ctx
5112 .savepoints
5113 .iter()
5114 .position(|(n, _)| n == name)
5115 .ok_or_else(|| {
5116 RedDBError::Internal(format!(
5117 "savepoint {name} does not exist"
5118 ))
5119 })?;
5120 let savepoint_xid = ctx.savepoints[pos].1;
5121 let aborted: Vec<Xid> = ctx
5122 .savepoints
5123 .split_off(pos)
5124 .into_iter()
5125 .map(|(_, x)| x)
5126 .collect();
5127 Some((savepoint_xid, aborted))
5128 } else {
5129 None
5130 }
5131 };
5132
5133 match drop_result {
5134 Some((savepoint_xid, aborted)) => {
5135 for x in &aborted {
5136 mgr.rollback(*x);
5137 }
5138 let revived = self.revive_tombstones_since(conn_id, savepoint_xid);
5139 (
5140 "rollback_to_savepoint",
5141 format!(
5142 "ROLLBACK TO SAVEPOINT {name} — aborted {} sub_xid(s), revived {revived} tombstone(s)",
5143 aborted.len()
5144 ),
5145 )
5146 }
5147 None => (
5148 "rollback_to_savepoint",
5149 "ROLLBACK TO outside transaction — no-op".to_string(),
5150 ),
5151 }
5152 }
5153 };
5154 Ok(RuntimeQueryResult::ok_message(
5155 query.to_string(),
5156 &msg,
5157 kind,
5158 ))
5159 }
5160 QueryExpr::CreateSchema(ref q) => {
5173 let store = self.inner.db.store();
5174 let key = format!("schema.{}", q.name);
5175 if store.get_config(&key).is_some() {
5176 if q.if_not_exists {
5177 return Ok(RuntimeQueryResult::ok_message(
5178 query.to_string(),
5179 &format!("schema {} already exists — skipped", q.name),
5180 "create_schema",
5181 ));
5182 }
5183 return Err(RedDBError::Internal(format!(
5184 "schema {} already exists",
5185 q.name
5186 )));
5187 }
5188 store.set_config_tree(&key, &crate::serde_json::Value::Bool(true));
5189 Ok(RuntimeQueryResult::ok_message(
5190 query.to_string(),
5191 &format!("schema {} created", q.name),
5192 "create_schema",
5193 ))
5194 }
5195 QueryExpr::DropSchema(ref q) => {
5196 let store = self.inner.db.store();
5197 let key = format!("schema.{}", q.name);
5198 let existed = store.get_config(&key).is_some();
5199 if !existed && !q.if_exists {
5200 return Err(RedDBError::Internal(format!(
5201 "schema {} does not exist",
5202 q.name
5203 )));
5204 }
5205 store.set_config_tree(&key, &crate::serde_json::Value::Null);
5207 let suffix = if q.cascade {
5208 " (CASCADE accepted — tables untouched)"
5209 } else {
5210 ""
5211 };
5212 Ok(RuntimeQueryResult::ok_message(
5213 query.to_string(),
5214 &format!("schema {} dropped{}", q.name, suffix),
5215 "drop_schema",
5216 ))
5217 }
5218 QueryExpr::CreateSequence(ref q) => {
5219 let store = self.inner.db.store();
5220 let base = format!("sequence.{}", q.name);
5221 let start_key = format!("{base}.start");
5222 let incr_key = format!("{base}.increment");
5223 let curr_key = format!("{base}.current");
5224 if store.get_config(&start_key).is_some() {
5225 if q.if_not_exists {
5226 return Ok(RuntimeQueryResult::ok_message(
5227 query.to_string(),
5228 &format!("sequence {} already exists — skipped", q.name),
5229 "create_sequence",
5230 ));
5231 }
5232 return Err(RedDBError::Internal(format!(
5233 "sequence {} already exists",
5234 q.name
5235 )));
5236 }
5237 let initial_current = q.start - q.increment;
5240 store.set_config_tree(
5241 &start_key,
5242 &crate::serde_json::Value::Number(q.start as f64),
5243 );
5244 store.set_config_tree(
5245 &incr_key,
5246 &crate::serde_json::Value::Number(q.increment as f64),
5247 );
5248 store.set_config_tree(
5249 &curr_key,
5250 &crate::serde_json::Value::Number(initial_current as f64),
5251 );
5252 Ok(RuntimeQueryResult::ok_message(
5253 query.to_string(),
5254 &format!(
5255 "sequence {} created (start={}, increment={})",
5256 q.name, q.start, q.increment
5257 ),
5258 "create_sequence",
5259 ))
5260 }
5261 QueryExpr::DropSequence(ref q) => {
5262 let store = self.inner.db.store();
5263 let base = format!("sequence.{}", q.name);
5264 let existed = store.get_config(&format!("{base}.start")).is_some();
5265 if !existed && !q.if_exists {
5266 return Err(RedDBError::Internal(format!(
5267 "sequence {} does not exist",
5268 q.name
5269 )));
5270 }
5271 for k in ["start", "increment", "current"] {
5272 store.set_config_tree(&format!("{base}.{k}"), &crate::serde_json::Value::Null);
5273 }
5274 Ok(RuntimeQueryResult::ok_message(
5275 query.to_string(),
5276 &format!("sequence {} dropped", q.name),
5277 "drop_sequence",
5278 ))
5279 }
5280 QueryExpr::CreateView(ref q) => {
5290 let mut views = self.inner.views.write();
5291 if views.contains_key(&q.name) && !q.or_replace {
5292 if q.if_not_exists {
5293 return Ok(RuntimeQueryResult::ok_message(
5294 query.to_string(),
5295 &format!("view {} already exists — skipped", q.name),
5296 "create_view",
5297 ));
5298 }
5299 return Err(RedDBError::Internal(format!(
5300 "view {} already exists",
5301 q.name
5302 )));
5303 }
5304 views.insert(q.name.clone(), Arc::new(q.clone()));
5305 drop(views);
5306
5307 if q.materialized {
5309 use crate::storage::cache::result::{MaterializedViewDef, RefreshPolicy};
5310 let def = MaterializedViewDef {
5311 name: q.name.clone(),
5312 query: format!("<parsed view {}>", q.name),
5313 dependencies: collect_table_refs(&q.query),
5314 refresh: RefreshPolicy::Manual,
5315 };
5316 self.inner.materialized_views.write().register(def);
5317 }
5318 self.invalidate_plan_cache();
5323 self.invalidate_result_cache();
5324
5325 Ok(RuntimeQueryResult::ok_message(
5326 query.to_string(),
5327 &format!(
5328 "{}view {} created",
5329 if q.materialized { "materialized " } else { "" },
5330 q.name
5331 ),
5332 "create_view",
5333 ))
5334 }
5335 QueryExpr::DropView(ref q) => {
5336 let mut views = self.inner.views.write();
5337 let existed = views.remove(&q.name).is_some();
5338 drop(views);
5339 if q.materialized || existed {
5340 self.inner.materialized_views.write().remove(&q.name);
5342 }
5343 self.invalidate_plan_cache();
5346 self.invalidate_result_cache();
5347 if !existed && !q.if_exists {
5348 return Err(RedDBError::Internal(format!(
5349 "view {} does not exist",
5350 q.name
5351 )));
5352 }
5353 self.invalidate_plan_cache();
5354 Ok(RuntimeQueryResult::ok_message(
5355 query.to_string(),
5356 &format!("view {} dropped", q.name),
5357 "drop_view",
5358 ))
5359 }
5360 QueryExpr::RefreshMaterializedView(ref q) => {
5361 let view = {
5364 let views = self.inner.views.read();
5365 views.get(&q.name).cloned()
5366 };
5367 let view = match view {
5368 Some(v) => v,
5369 None => {
5370 return Err(RedDBError::Internal(format!(
5371 "view {} does not exist",
5372 q.name
5373 )))
5374 }
5375 };
5376 if !view.materialized {
5377 return Err(RedDBError::Internal(format!(
5378 "view {} is not materialized — REFRESH requires \
5379 CREATE MATERIALIZED VIEW",
5380 q.name
5381 )));
5382 }
5383 let inner_result = self.execute_query_expr((*view.query).clone())?;
5385 let serialized = format!("{:?}", inner_result.result);
5388 self.inner
5389 .materialized_views
5390 .write()
5391 .refresh(&q.name, serialized.into_bytes());
5392 Ok(RuntimeQueryResult::ok_message(
5393 query.to_string(),
5394 &format!("materialized view {} refreshed", q.name),
5395 "refresh_materialized_view",
5396 ))
5397 }
5398 QueryExpr::CreatePolicy(ref q) => {
5405 let key = (q.table.clone(), q.name.clone());
5406 self.inner
5407 .rls_policies
5408 .write()
5409 .insert(key, Arc::new(q.clone()));
5410 self.invalidate_plan_cache();
5411 self.schema_vocabulary_apply(
5415 crate::runtime::schema_vocabulary::DdlEvent::CreatePolicy {
5416 collection: q.table.clone(),
5417 policy: q.name.clone(),
5418 },
5419 );
5420 Ok(RuntimeQueryResult::ok_message(
5421 query.to_string(),
5422 &format!("policy {} on {} created", q.name, q.table),
5423 "create_policy",
5424 ))
5425 }
5426 QueryExpr::DropPolicy(ref q) => {
5427 let removed = self
5428 .inner
5429 .rls_policies
5430 .write()
5431 .remove(&(q.table.clone(), q.name.clone()))
5432 .is_some();
5433 if !removed && !q.if_exists {
5434 return Err(RedDBError::Internal(format!(
5435 "policy {} on {} does not exist",
5436 q.name, q.table
5437 )));
5438 }
5439 self.invalidate_plan_cache();
5440 self.schema_vocabulary_apply(
5443 crate::runtime::schema_vocabulary::DdlEvent::DropPolicy {
5444 collection: q.table.clone(),
5445 policy: q.name.clone(),
5446 },
5447 );
5448 Ok(RuntimeQueryResult::ok_message(
5449 query.to_string(),
5450 &format!("policy {} on {} dropped", q.name, q.table),
5451 "drop_policy",
5452 ))
5453 }
5454 QueryExpr::CreateServer(ref q) => {
5465 use crate::storage::fdw::FdwOptions;
5466 let registry = Arc::clone(&self.inner.foreign_tables);
5467 if registry.server(&q.name).is_some() {
5468 if q.if_not_exists {
5469 return Ok(RuntimeQueryResult::ok_message(
5470 query.to_string(),
5471 &format!("server {} already exists — skipped", q.name),
5472 "create_server",
5473 ));
5474 }
5475 return Err(RedDBError::Internal(format!(
5476 "server {} already exists",
5477 q.name
5478 )));
5479 }
5480 let mut opts = FdwOptions::new();
5481 for (k, v) in &q.options {
5482 opts.values.insert(k.clone(), v.clone());
5483 }
5484 registry
5485 .create_server(&q.name, &q.wrapper, opts)
5486 .map_err(|e| RedDBError::Internal(e.to_string()))?;
5487 Ok(RuntimeQueryResult::ok_message(
5488 query.to_string(),
5489 &format!("server {} created (wrapper {})", q.name, q.wrapper),
5490 "create_server",
5491 ))
5492 }
5493 QueryExpr::DropServer(ref q) => {
5494 let existed = self.inner.foreign_tables.drop_server(&q.name);
5495 if !existed && !q.if_exists {
5496 return Err(RedDBError::Internal(format!(
5497 "server {} does not exist",
5498 q.name
5499 )));
5500 }
5501 Ok(RuntimeQueryResult::ok_message(
5502 query.to_string(),
5503 &format!(
5504 "server {} dropped{}",
5505 q.name,
5506 if q.cascade { " (cascade)" } else { "" }
5507 ),
5508 "drop_server",
5509 ))
5510 }
5511 QueryExpr::CreateForeignTable(ref q) => {
5512 use crate::storage::fdw::{FdwOptions, ForeignColumn, ForeignTable};
5513 let registry = Arc::clone(&self.inner.foreign_tables);
5514 if registry.foreign_table(&q.name).is_some() {
5515 if q.if_not_exists {
5516 return Ok(RuntimeQueryResult::ok_message(
5517 query.to_string(),
5518 &format!("foreign table {} already exists — skipped", q.name),
5519 "create_foreign_table",
5520 ));
5521 }
5522 return Err(RedDBError::Internal(format!(
5523 "foreign table {} already exists",
5524 q.name
5525 )));
5526 }
5527 let mut opts = FdwOptions::new();
5528 for (k, v) in &q.options {
5529 opts.values.insert(k.clone(), v.clone());
5530 }
5531 let columns: Vec<ForeignColumn> = q
5532 .columns
5533 .iter()
5534 .map(|c| ForeignColumn {
5535 name: c.name.clone(),
5536 data_type: c.data_type.clone(),
5537 not_null: c.not_null,
5538 })
5539 .collect();
5540 registry
5541 .create_foreign_table(ForeignTable {
5542 name: q.name.clone(),
5543 server_name: q.server.clone(),
5544 columns,
5545 options: opts,
5546 })
5547 .map_err(|e| RedDBError::Internal(e.to_string()))?;
5548 self.invalidate_plan_cache();
5549 Ok(RuntimeQueryResult::ok_message(
5550 query.to_string(),
5551 &format!("foreign table {} created (server {})", q.name, q.server),
5552 "create_foreign_table",
5553 ))
5554 }
5555 QueryExpr::DropForeignTable(ref q) => {
5556 let existed = self.inner.foreign_tables.drop_foreign_table(&q.name);
5557 if !existed && !q.if_exists {
5558 return Err(RedDBError::Internal(format!(
5559 "foreign table {} does not exist",
5560 q.name
5561 )));
5562 }
5563 self.invalidate_plan_cache();
5564 Ok(RuntimeQueryResult::ok_message(
5565 query.to_string(),
5566 &format!("foreign table {} dropped", q.name),
5567 "drop_foreign_table",
5568 ))
5569 }
5570 QueryExpr::CopyFrom(ref q) => {
5576 use crate::storage::import::{CsvConfig, CsvImporter};
5577 let store = self.inner.db.store();
5578 let cfg = CsvConfig {
5579 collection: q.table.clone(),
5580 has_header: q.has_header,
5581 delimiter: q.delimiter.map(|c| c as u8).unwrap_or(b','),
5582 ..CsvConfig::default()
5583 };
5584 let importer = CsvImporter::new(cfg);
5585 let stats = importer
5586 .import_file(&q.path, store.as_ref())
5587 .map_err(|e| RedDBError::Internal(format!("COPY failed: {e}")))?;
5588 self.note_table_write(&q.table);
5590 Ok(RuntimeQueryResult::ok_message(
5591 query.to_string(),
5592 &format!(
5593 "COPY imported {} rows into {} ({} errors skipped, {}ms)",
5594 stats.records_imported, q.table, stats.errors_skipped, stats.duration_ms
5595 ),
5596 "copy_from",
5597 ))
5598 }
5599 QueryExpr::MaintenanceCommand(ref cmd) => {
5615 use crate::storage::query::ast::MaintenanceCommand as Mc;
5616 let store = self.inner.db.store();
5617 let (kind, msg) = match cmd {
5618 Mc::Analyze { target } => {
5619 let targets: Vec<String> = match target {
5620 Some(t) => vec![t.clone()],
5621 None => store.list_collections(),
5622 };
5623 for t in &targets {
5624 self.refresh_table_planner_stats(t);
5625 }
5626 (
5627 "analyze",
5628 format!("ANALYZE refreshed stats for {} table(s)", targets.len()),
5629 )
5630 }
5631 Mc::Vacuum { target, full } => {
5632 let targets: Vec<String> = match target {
5633 Some(t) => vec![t.clone()],
5634 None => store.list_collections(),
5635 };
5636 for t in &targets {
5638 self.refresh_table_planner_stats(t);
5639 }
5640 let persisted = if *full {
5644 match store.persist() {
5645 Ok(()) => true,
5646 Err(e) => {
5647 return Err(RedDBError::Internal(format!(
5648 "VACUUM FULL persist failed: {e:?}"
5649 )));
5650 }
5651 }
5652 } else {
5653 false
5654 };
5655 self.invalidate_result_cache();
5657 (
5658 "vacuum",
5659 format!(
5660 "VACUUM{} processed {} table(s){}",
5661 if *full { " FULL" } else { "" },
5662 targets.len(),
5663 if persisted {
5664 " (pages flushed to disk)"
5665 } else {
5666 ""
5667 }
5668 ),
5669 )
5670 }
5671 };
5672 Ok(RuntimeQueryResult::ok_message(
5673 query.to_string(),
5674 &msg,
5675 kind,
5676 ))
5677 }
5678 QueryExpr::Grant(ref g) => self.execute_grant_statement(query, g),
5685 QueryExpr::Revoke(ref r) => self.execute_revoke_statement(query, r),
5686 QueryExpr::AlterUser(ref a) => self.execute_alter_user_statement(query, a),
5687 QueryExpr::CreateIamPolicy { ref id, ref json } => {
5688 self.execute_create_iam_policy(query, id, json)
5689 }
5690 QueryExpr::DropIamPolicy { ref id } => self.execute_drop_iam_policy(query, id),
5691 QueryExpr::AttachPolicy {
5692 ref policy_id,
5693 ref principal,
5694 } => self.execute_attach_policy(query, policy_id, principal),
5695 QueryExpr::DetachPolicy {
5696 ref policy_id,
5697 ref principal,
5698 } => self.execute_detach_policy(query, policy_id, principal),
5699 QueryExpr::ShowPolicies { ref filter } => {
5700 self.execute_show_policies(query, filter.as_ref())
5701 }
5702 QueryExpr::ShowEffectivePermissions {
5703 ref user,
5704 ref resource,
5705 } => self.execute_show_effective_permissions(query, user, resource.as_ref()),
5706 QueryExpr::SimulatePolicy {
5707 ref user,
5708 ref action,
5709 ref resource,
5710 } => self.execute_simulate_policy(query, user, action, resource),
5711 QueryExpr::CreateMigration(ref q) => self.execute_create_migration(query, q),
5712 QueryExpr::ApplyMigration(ref q) => self.execute_apply_migration(query, q),
5713 QueryExpr::RollbackMigration(ref q) => self.execute_rollback_migration(query, q),
5714 QueryExpr::ExplainMigration(ref q) => self.execute_explain_migration(query, q),
5715 };
5716
5717 let mut query_result = query_result;
5721 if let Ok(ref mut result) = query_result {
5722 if result.statement_type == "select" {
5723 self.apply_secret_decryption(result);
5724 }
5725 }
5726
5727 if let Ok(ref result) = query_result {
5734 frame.write_result_cache(self, result, result_cache_scopes);
5735 }
5736
5737 query_result
5738 }
5739
5740 pub(crate) fn execute_query_expr(&self, expr: QueryExpr) -> RedDBResult<RuntimeQueryResult> {
5746 let _config_snapshot_guard = ConfigSnapshotGuard::install(Arc::clone(&self.inner.db));
5747 let _secret_store_guard = SecretStoreGuard::install(self.inner.auth_store.read().clone());
5748 let expr = self.rewrite_view_refs(expr);
5752
5753 self.validate_model_operations_before_auth(&expr)?;
5754 if let Err(err) = self.check_query_privilege(&expr) {
5758 return Err(RedDBError::Query(format!("permission denied: {err}")));
5759 }
5760
5761 let statement = query_expr_name(&expr);
5762 let mode = detect_mode(statement);
5763 let query_str = statement;
5764
5765 let result = self.dispatch_expr(expr, query_str, mode)?;
5766 let mut r = result;
5767 if r.statement_type == "select" {
5768 self.apply_secret_decryption(&mut r);
5769 }
5770 Ok(r)
5771 }
5772
5773 pub(super) fn validate_model_operations_before_auth(
5774 &self,
5775 expr: &QueryExpr,
5776 ) -> RedDBResult<()> {
5777 use crate::catalog::CollectionModel;
5778 use crate::runtime::ddl::polymorphic_resolver;
5779 use crate::storage::query::ast::KvCommand;
5780
5781 let system_schema_target = match expr {
5782 QueryExpr::DropTable(q) => Some(q.name.as_str()),
5783 QueryExpr::DropGraph(q) => Some(q.name.as_str()),
5784 QueryExpr::DropVector(q) => Some(q.name.as_str()),
5785 QueryExpr::DropDocument(q) => Some(q.name.as_str()),
5786 QueryExpr::DropKv(q) => Some(q.name.as_str()),
5787 QueryExpr::DropCollection(q) => Some(q.name.as_str()),
5788 QueryExpr::Truncate(q) => Some(q.name.as_str()),
5789 _ => None,
5790 };
5791 if system_schema_target.is_some_and(crate::runtime::impl_ddl::is_system_schema_name) {
5792 return Err(RedDBError::Query("system schema is read-only".to_string()));
5793 }
5794
5795 let expected = match expr {
5796 QueryExpr::DropTable(q) => Some((q.name.as_str(), CollectionModel::Table)),
5797 QueryExpr::DropGraph(q) => Some((q.name.as_str(), CollectionModel::Graph)),
5798 QueryExpr::DropVector(q) => Some((q.name.as_str(), CollectionModel::Vector)),
5799 QueryExpr::DropDocument(q) => Some((q.name.as_str(), CollectionModel::Document)),
5800 QueryExpr::DropKv(q) => Some((q.name.as_str(), q.model)),
5801 QueryExpr::Truncate(q) => q.model.map(|model| (q.name.as_str(), model)),
5802 QueryExpr::KvCommand(cmd) => {
5803 let (collection, model) = match cmd {
5804 KvCommand::Put {
5805 collection, model, ..
5806 }
5807 | KvCommand::Get {
5808 collection, model, ..
5809 }
5810 | KvCommand::Incr {
5811 collection, model, ..
5812 }
5813 | KvCommand::Cas {
5814 collection, model, ..
5815 }
5816 | KvCommand::Delete {
5817 collection, model, ..
5818 } => (collection.as_str(), *model),
5819 KvCommand::Rotate { collection, .. }
5820 | KvCommand::History { collection, .. }
5821 | KvCommand::List { collection, .. }
5822 | KvCommand::Purge { collection, .. } => {
5823 (collection.as_str(), CollectionModel::Vault)
5824 }
5825 KvCommand::InvalidateTags { collection, .. } => {
5826 (collection.as_str(), CollectionModel::Kv)
5827 }
5828 KvCommand::Watch {
5829 collection, model, ..
5830 } => (collection.as_str(), *model),
5831 KvCommand::Unseal { collection, .. } => {
5832 (collection.as_str(), CollectionModel::Vault)
5833 }
5834 };
5835 Some((collection, model))
5836 }
5837 QueryExpr::ConfigCommand(cmd) => {
5838 self.validate_config_command_before_auth(cmd)?;
5839 None
5840 }
5841 _ => None,
5842 };
5843
5844 let Some((name, expected_model)) = expected else {
5845 return Ok(());
5846 };
5847 let snapshot = self.inner.db.catalog_model_snapshot();
5848 let Some(actual_model) = snapshot
5849 .collections
5850 .iter()
5851 .find(|collection| collection.name == name)
5852 .map(|collection| collection.declared_model.unwrap_or(collection.model))
5853 else {
5854 return Ok(());
5855 };
5856 polymorphic_resolver::ensure_model_match(expected_model, actual_model)
5857 }
5858
5859 pub(super) fn rewrite_view_refs(&self, expr: QueryExpr) -> QueryExpr {
5864 if self.inner.views.read().is_empty() {
5866 return expr;
5867 }
5868 self.rewrite_view_refs_inner(expr)
5869 }
5870
5871 fn rewrite_view_refs_inner(&self, expr: QueryExpr) -> QueryExpr {
5872 use crate::storage::query::ast::{Filter, TableSource};
5873 match expr {
5874 QueryExpr::Table(mut tq) => {
5875 if let Some(TableSource::Subquery(body)) = tq.source.take() {
5881 tq.source = Some(TableSource::Subquery(Box::new(
5882 self.rewrite_view_refs_inner(*body),
5883 )));
5884 return QueryExpr::Table(tq);
5885 }
5886
5887 let maybe_view = {
5891 let views = self.inner.views.read();
5892 views.get(&tq.table).cloned()
5893 };
5894 let Some(view) = maybe_view else {
5895 return QueryExpr::Table(tq);
5896 };
5897
5898 let inner_expr = self.rewrite_view_refs_inner((*view.query).clone());
5902
5903 match inner_expr {
5911 QueryExpr::Table(mut inner_tq) => {
5912 if let Some(outer_filter) = tq.filter.take() {
5913 inner_tq.filter = Some(match inner_tq.filter.take() {
5914 Some(existing) => {
5915 Filter::And(Box::new(existing), Box::new(outer_filter))
5916 }
5917 None => outer_filter,
5918 });
5919 }
5920 if let Some(outer_limit) = tq.limit {
5921 inner_tq.limit = Some(match inner_tq.limit {
5922 Some(existing) => existing.min(outer_limit),
5923 None => outer_limit,
5924 });
5925 }
5926 if let Some(outer_offset) = tq.offset {
5927 inner_tq.offset = Some(match inner_tq.offset {
5928 Some(existing) => existing + outer_offset,
5929 None => outer_offset,
5930 });
5931 }
5932 QueryExpr::Table(inner_tq)
5933 }
5934 other => other,
5935 }
5936 }
5937 QueryExpr::Join(mut jq) => {
5938 jq.left = Box::new(self.rewrite_view_refs_inner(*jq.left));
5939 jq.right = Box::new(self.rewrite_view_refs_inner(*jq.right));
5940 QueryExpr::Join(jq)
5941 }
5942 other => other,
5945 }
5946 }
5947
5948 fn authorize_relational_table_select(
5952 &self,
5953 mut table: TableQuery,
5954 frame: &dyn super::statement_frame::ReadFrame,
5955 ) -> RedDBResult<Option<TableQuery>> {
5956 if let Some(TableSource::Subquery(inner)) = table.source.take() {
5957 let authorized_inner = self.authorize_relational_select_expr(*inner, frame)?;
5958 table.source = Some(TableSource::Subquery(Box::new(authorized_inner)));
5959 return Ok(Some(table));
5960 }
5961
5962 self.check_table_column_projection_authz(&table, frame)?;
5963
5964 if self.inner.rls_enabled_tables.read().contains(&table.table) {
5965 return Ok(inject_rls_filters(self, frame, table));
5966 }
5967
5968 Ok(Some(table))
5969 }
5970
5971 fn authorize_relational_join_select(
5972 &self,
5973 mut join: JoinQuery,
5974 frame: &dyn super::statement_frame::ReadFrame,
5975 ) -> RedDBResult<Option<JoinQuery>> {
5976 self.check_join_column_projection_authz(&join, frame)?;
5977 join.left = Box::new(self.authorize_relational_join_child(*join.left, frame)?);
5978 join.right = Box::new(self.authorize_relational_join_child(*join.right, frame)?);
5979 Ok(inject_rls_into_join(self, frame, join))
5980 }
5981
5982 fn authorize_relational_join_child(
5983 &self,
5984 expr: QueryExpr,
5985 frame: &dyn super::statement_frame::ReadFrame,
5986 ) -> RedDBResult<QueryExpr> {
5987 match expr {
5988 QueryExpr::Table(mut table) => {
5989 if let Some(TableSource::Subquery(inner)) = table.source.take() {
5990 let authorized_inner = self.authorize_relational_select_expr(*inner, frame)?;
5991 table.source = Some(TableSource::Subquery(Box::new(authorized_inner)));
5992 }
5993 Ok(QueryExpr::Table(table))
5994 }
5995 QueryExpr::Join(join) => self
5996 .authorize_relational_join_select(join, frame)?
5997 .map(QueryExpr::Join)
5998 .ok_or_else(|| {
5999 RedDBError::Query("permission denied: RLS denied relational subquery".into())
6000 }),
6001 other => Ok(other),
6002 }
6003 }
6004
6005 fn authorize_relational_select_expr(
6006 &self,
6007 expr: QueryExpr,
6008 frame: &dyn super::statement_frame::ReadFrame,
6009 ) -> RedDBResult<QueryExpr> {
6010 match expr {
6011 QueryExpr::Table(table) => self
6012 .authorize_relational_table_select(table, frame)?
6013 .map(QueryExpr::Table)
6014 .ok_or_else(|| {
6015 RedDBError::Query("permission denied: RLS denied relational subquery".into())
6016 }),
6017 QueryExpr::Join(join) => self
6018 .authorize_relational_join_select(join, frame)?
6019 .map(QueryExpr::Join)
6020 .ok_or_else(|| {
6021 RedDBError::Query("permission denied: RLS denied relational subquery".into())
6022 }),
6023 other => Ok(other),
6024 }
6025 }
6026
6027 fn check_table_column_projection_authz(
6028 &self,
6029 table: &TableQuery,
6030 frame: &dyn super::statement_frame::ReadFrame,
6031 ) -> RedDBResult<()> {
6032 let Some((username, role)) = frame.identity() else {
6033 return Ok(());
6034 };
6035 let Some(auth_store) = self.inner.auth_store.read().clone() else {
6036 return Ok(());
6037 };
6038
6039 let columns = self.resolved_table_projection_columns(table)?;
6040 let request = ColumnAccessRequest::select(table.table.clone(), columns);
6041 let principal = UserId::from_parts(frame.effective_scope(), username);
6042 let ctx = runtime_iam_context(role, frame.effective_scope());
6043 let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
6044 if outcome.allowed() {
6045 return Ok(());
6046 }
6047
6048 if let Some(denied) = outcome.first_denied_column() {
6049 return Err(RedDBError::Query(format!(
6050 "permission denied: principal=`{username}` cannot select column `{}`",
6051 denied.resource.name
6052 )));
6053 }
6054 Err(RedDBError::Query(format!(
6055 "permission denied: principal=`{username}` cannot select table `{}`",
6056 table.table
6057 )))
6058 }
6059
6060 fn check_join_column_projection_authz(
6061 &self,
6062 join: &JoinQuery,
6063 frame: &dyn super::statement_frame::ReadFrame,
6064 ) -> RedDBResult<()> {
6065 let mut by_table: HashMap<String, BTreeSet<String>> = HashMap::new();
6066 let projections = crate::storage::query::sql_lowering::effective_join_projections(join);
6067 self.collect_join_projection_columns(join, &projections, &mut by_table)?;
6068
6069 for (table, columns) in by_table {
6070 let query = TableQuery {
6071 table,
6072 source: None,
6073 alias: None,
6074 select_items: Vec::new(),
6075 columns: columns.into_iter().map(Projection::Column).collect(),
6076 where_expr: None,
6077 filter: None,
6078 group_by_exprs: Vec::new(),
6079 group_by: Vec::new(),
6080 having_expr: None,
6081 having: None,
6082 order_by: Vec::new(),
6083 limit: None,
6084 offset: None,
6085 expand: None,
6086 as_of: None,
6087 };
6088 self.check_table_column_projection_authz(&query, frame)?;
6089 }
6090 Ok(())
6091 }
6092
6093 fn collect_join_projection_columns(
6094 &self,
6095 join: &JoinQuery,
6096 projections: &[Projection],
6097 out: &mut HashMap<String, BTreeSet<String>>,
6098 ) -> RedDBResult<()> {
6099 let left = table_side_context(join.left.as_ref());
6100 let right = table_side_context(join.right.as_ref());
6101
6102 if projections
6103 .iter()
6104 .any(|projection| matches!(projection, Projection::All))
6105 {
6106 for side in [left.as_ref(), right.as_ref()].into_iter().flatten() {
6107 out.entry(side.table.clone())
6108 .or_default()
6109 .extend(self.table_all_projection_columns(&side.table)?);
6110 }
6111 return Ok(());
6112 }
6113
6114 for projection in projections {
6115 collect_projection_columns_for_join_side(
6116 projection,
6117 left.as_ref(),
6118 right.as_ref(),
6119 out,
6120 )?;
6121 }
6122 Ok(())
6123 }
6124
6125 fn resolved_table_projection_columns(&self, table: &TableQuery) -> RedDBResult<Vec<String>> {
6126 let projections = crate::storage::query::sql_lowering::effective_table_projections(table);
6127 if projections
6128 .iter()
6129 .any(|projection| matches!(projection, Projection::All))
6130 {
6131 return self.table_all_projection_columns(&table.table);
6132 }
6133
6134 let mut columns = BTreeSet::new();
6135 for projection in &projections {
6136 collect_projection_columns_for_table(
6137 projection,
6138 &table.table,
6139 table.alias.as_deref(),
6140 &mut columns,
6141 );
6142 }
6143 Ok(columns.into_iter().collect())
6144 }
6145
6146 fn table_all_projection_columns(&self, table: &str) -> RedDBResult<Vec<String>> {
6147 if let Some(contract) = self.inner.db.collection_contract_arc(table) {
6148 let columns: Vec<String> = contract
6149 .declared_columns
6150 .iter()
6151 .map(|column| column.name.clone())
6152 .collect();
6153 if !columns.is_empty() {
6154 return Ok(columns);
6155 }
6156 }
6157
6158 let records = scan_runtime_table_source_records_limited(&self.inner.db, table, Some(1))?;
6159 Ok(records
6160 .first()
6161 .map(|record| {
6162 record
6163 .column_names()
6164 .into_iter()
6165 .map(|column| column.to_string())
6166 .collect()
6167 })
6168 .unwrap_or_default())
6169 }
6170
6171 fn dispatch_expr(
6172 &self,
6173 expr: QueryExpr,
6174 query_str: &str,
6175 mode: QueryMode,
6176 ) -> RedDBResult<RuntimeQueryResult> {
6177 let statement = query_expr_name(&expr);
6178 match expr {
6179 QueryExpr::Graph(_) | QueryExpr::Path(_) => {
6180 Err(RedDBError::Query(
6182 "graph queries cannot be used as prepared statements".to_string(),
6183 ))
6184 }
6185 QueryExpr::Table(table) => {
6186 let scope = self.ai_scope();
6187 if super::red_schema::is_virtual_table(&table.table) {
6188 return Ok(RuntimeQueryResult {
6189 query: query_str.to_string(),
6190 mode,
6191 statement,
6192 engine: "runtime-red-schema",
6193 result: super::red_schema::red_query(
6194 self,
6195 &table.table,
6196 &table,
6197 &scope as &dyn super::statement_frame::ReadFrame,
6198 )?,
6199 affected_rows: 0,
6200 statement_type: "select",
6201 });
6202 }
6203 let Some(table_with_rls) = self.authorize_relational_table_select(
6204 table,
6205 &scope as &dyn super::statement_frame::ReadFrame,
6206 )?
6207 else {
6208 return Ok(RuntimeQueryResult {
6209 query: query_str.to_string(),
6210 mode,
6211 statement,
6212 engine: "runtime-table-rls",
6213 result: crate::storage::query::unified::UnifiedResult::empty(),
6214 affected_rows: 0,
6215 statement_type: "select",
6216 });
6217 };
6218 Ok(RuntimeQueryResult {
6219 query: query_str.to_string(),
6220 mode,
6221 statement,
6222 engine: "runtime-table",
6223 result: execute_runtime_table_query(
6224 &self.inner.db,
6225 &table_with_rls,
6226 Some(&self.inner.index_store),
6227 )?,
6228 affected_rows: 0,
6229 statement_type: "select",
6230 })
6231 }
6232 QueryExpr::Join(join) => {
6233 let scope = self.ai_scope();
6234 let Some(join_with_rls) = self.authorize_relational_join_select(
6235 join,
6236 &scope as &dyn super::statement_frame::ReadFrame,
6237 )?
6238 else {
6239 return Ok(RuntimeQueryResult {
6240 query: query_str.to_string(),
6241 mode,
6242 statement,
6243 engine: "runtime-join-rls",
6244 result: crate::storage::query::unified::UnifiedResult::empty(),
6245 affected_rows: 0,
6246 statement_type: "select",
6247 });
6248 };
6249 Ok(RuntimeQueryResult {
6250 query: query_str.to_string(),
6251 mode,
6252 statement,
6253 engine: "runtime-join",
6254 result: execute_runtime_join_query(&self.inner.db, &join_with_rls)?,
6255 affected_rows: 0,
6256 statement_type: "select",
6257 })
6258 }
6259 QueryExpr::Vector(vector) => Ok(RuntimeQueryResult {
6260 query: query_str.to_string(),
6261 mode,
6262 statement,
6263 engine: "runtime-vector",
6264 result: execute_runtime_vector_query(&self.inner.db, &vector)?,
6265 affected_rows: 0,
6266 statement_type: "select",
6267 }),
6268 QueryExpr::Hybrid(hybrid) => Ok(RuntimeQueryResult {
6269 query: query_str.to_string(),
6270 mode,
6271 statement,
6272 engine: "runtime-hybrid",
6273 result: execute_runtime_hybrid_query(&self.inner.db, &hybrid)?,
6274 affected_rows: 0,
6275 statement_type: "select",
6276 }),
6277 _ => Err(RedDBError::Query(format!(
6278 "prepared-statement execution does not support {statement} statements"
6279 ))),
6280 }
6281 }
6282
6283 fn try_fast_entity_lookup(&self, query: &str) -> Option<RedDBResult<RuntimeQueryResult>> {
6286 let q = query.trim();
6289 if !q.starts_with("SELECT") && !q.starts_with("select") {
6290 return None;
6291 }
6292
6293 let where_pos = q
6295 .find("WHERE _entity_id")
6296 .or_else(|| q.find("where _entity_id"))?;
6297 let after_field = &q[where_pos + 16..].trim_start(); let after_eq = after_field.strip_prefix('=')?.trim_start();
6299
6300 let id_str = after_eq.trim();
6302 let entity_id: u64 = id_str.parse().ok()?;
6303
6304 let from_pos = q.find("FROM ").or_else(|| q.find("from "))? + 5;
6306 let table = q[from_pos..where_pos].trim();
6307 if table.is_empty()
6308 || table.contains(' ') && !table.contains(" AS ") && !table.contains(" as ")
6309 {
6310 return None; }
6312 let table_name = table.split_whitespace().next()?;
6313
6314 let store = self.inner.db.store();
6320 let entity = store
6321 .get(
6322 table_name,
6323 crate::storage::unified::EntityId::new(entity_id),
6324 )
6325 .filter(entity_visible_under_current_snapshot);
6326
6327 let count = if entity.is_some() { 1u64 } else { 0 };
6328
6329 let records: Vec<crate::storage::query::unified::UnifiedRecord> = entity
6335 .as_ref()
6336 .and_then(|e| runtime_table_record_from_entity(e.clone()))
6337 .into_iter()
6338 .collect();
6339
6340 let json = match entity {
6341 Some(ref e) => execute_runtime_serialize_single_entity(e),
6342 None => r#"{"columns":[],"record_count":0,"selection":{"scope":"any"},"records":[]}"#
6343 .to_string(),
6344 };
6345
6346 Some(Ok(RuntimeQueryResult {
6347 query: query.to_string(),
6348 mode: crate::storage::query::modes::QueryMode::Sql,
6349 statement: "select",
6350 engine: "fast-entity-lookup",
6351 result: crate::storage::query::unified::UnifiedResult {
6352 columns: Vec::new(),
6353 records,
6354 stats: crate::storage::query::unified::QueryStats {
6355 rows_scanned: count,
6356 ..Default::default()
6357 },
6358 pre_serialized_json: Some(json),
6359 },
6360 affected_rows: 0,
6361 statement_type: "select",
6362 }))
6363 }
6364
6365 fn result_cache_backend(&self) -> RuntimeResultCacheBackend {
6366 match self
6367 .config_string(RESULT_CACHE_BACKEND_KEY, RESULT_CACHE_DEFAULT_BACKEND)
6368 .as_str()
6369 {
6370 "blob_cache" => RuntimeResultCacheBackend::BlobCache,
6371 "shadow" => RuntimeResultCacheBackend::Shadow,
6372 _ => RuntimeResultCacheBackend::Legacy,
6373 }
6374 }
6375
6376 pub(super) fn get_result_cache_entry(&self, key: &str) -> Option<RuntimeQueryResult> {
6377 match self.result_cache_backend() {
6378 RuntimeResultCacheBackend::Legacy => self.get_legacy_result_cache_entry(key),
6379 RuntimeResultCacheBackend::BlobCache => self.get_blob_result_cache_entry(key),
6380 RuntimeResultCacheBackend::Shadow => {
6381 let legacy = self.get_legacy_result_cache_entry(key);
6382 let blob = self.get_blob_result_cache_entry(key);
6383 if let (Some(ref legacy), Some(ref blob)) = (&legacy, &blob) {
6384 if result_cache_fingerprint(legacy) != result_cache_fingerprint(blob) {
6385 self.inner
6386 .result_cache_shadow_divergences
6387 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
6388 tracing::warn!(
6389 key,
6390 metric = crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
6391 "result cache shadow backend diverged from legacy"
6392 );
6393 }
6394 }
6395 legacy
6396 }
6397 }
6398 }
6399
6400 fn get_legacy_result_cache_entry(&self, key: &str) -> Option<RuntimeQueryResult> {
6401 let cache = self.inner.result_cache.read();
6402 cache.0.get(key).and_then(|entry| {
6403 if entry.cached_at.elapsed().as_secs() < RESULT_CACHE_TTL_SECS {
6404 Some(entry.result.clone())
6405 } else {
6406 None
6407 }
6408 })
6409 }
6410
6411 fn get_blob_result_cache_entry(&self, key: &str) -> Option<RuntimeQueryResult> {
6412 let hit = self
6413 .inner
6414 .result_blob_cache
6415 .get(RESULT_CACHE_BLOB_NAMESPACE, key)?;
6416 {
6417 let cache = self.inner.result_blob_entries.read();
6418 if let Some(entry) = cache.0.get(key) {
6419 return Some(entry.result.clone());
6420 }
6421 }
6422
6423 let (result, scopes) = decode_result_cache_payload(hit.value())?;
6424 let mut cache = self.inner.result_blob_entries.write();
6425 let (ref mut map, ref mut order) = *cache;
6426 if !map.contains_key(key) {
6427 order.push_back(key.to_string());
6428 }
6429 map.insert(
6430 key.to_string(),
6431 RuntimeResultCacheEntry {
6432 result: result.clone(),
6433 cached_at: std::time::Instant::now(),
6434 scopes,
6435 },
6436 );
6437 trim_result_cache(map, order);
6438 Some(result)
6439 }
6440
6441 pub(super) fn put_result_cache_entry(&self, key: &str, entry: RuntimeResultCacheEntry) {
6442 match self.result_cache_backend() {
6443 RuntimeResultCacheBackend::Legacy => self.put_legacy_result_cache_entry(key, entry),
6444 RuntimeResultCacheBackend::BlobCache => self.put_blob_result_cache_entry(key, entry),
6445 RuntimeResultCacheBackend::Shadow => {
6446 self.put_legacy_result_cache_entry(key, entry.clone());
6447 self.put_blob_result_cache_entry(key, entry);
6448 }
6449 }
6450 }
6451
6452 fn put_legacy_result_cache_entry(&self, key: &str, entry: RuntimeResultCacheEntry) {
6453 let mut cache = self.inner.result_cache.write();
6454 let (ref mut map, ref mut order) = *cache;
6455 if !map.contains_key(key) {
6456 order.push_back(key.to_string());
6457 }
6458 map.insert(key.to_string(), entry);
6459 trim_result_cache(map, order);
6460 }
6461
6462 fn put_blob_result_cache_entry(&self, key: &str, entry: RuntimeResultCacheEntry) {
6463 let policy = crate::storage::cache::BlobCachePolicy::default()
6464 .ttl_ms(RESULT_CACHE_TTL_SECS * 1000)
6465 .priority(200);
6466 let dependencies = entry.scopes.iter().cloned().collect::<Vec<_>>();
6467 let bytes = encode_result_cache_payload(&entry)
6468 .unwrap_or_else(|| result_cache_fingerprint(&entry.result).into_bytes());
6469 let put = crate::storage::cache::BlobCachePut::new(bytes)
6470 .with_dependencies(dependencies)
6471 .with_policy(policy);
6472 if self
6473 .inner
6474 .result_blob_cache
6475 .put(RESULT_CACHE_BLOB_NAMESPACE, key, put)
6476 .is_err()
6477 {
6478 return;
6479 }
6480
6481 let mut cache = self.inner.result_blob_entries.write();
6482 let (ref mut map, ref mut order) = *cache;
6483 if !map.contains_key(key) {
6484 order.push_back(key.to_string());
6485 }
6486 map.insert(key.to_string(), entry);
6487 trim_result_cache(map, order);
6488 }
6489
6490 pub fn result_cache_shadow_divergences(&self) -> u64 {
6491 self.inner
6492 .result_cache_shadow_divergences
6493 .load(std::sync::atomic::Ordering::Relaxed)
6494 }
6495
6496 pub fn invalidate_result_cache(&self) {
6499 let mut cache = self.inner.result_cache.write();
6500 cache.0.clear();
6501 cache.1.clear();
6502 let mut blob_entries = self.inner.result_blob_entries.write();
6503 blob_entries.0.clear();
6504 blob_entries.1.clear();
6505 self.inner
6506 .result_blob_cache
6507 .invalidate_namespace(RESULT_CACHE_BLOB_NAMESPACE);
6508 }
6509
6510 pub(crate) fn invalidate_result_cache_for_table(&self, table: &str) {
6513 let legacy_has_match = {
6516 let cache = self.inner.result_cache.read();
6517 let (ref map, _) = *cache;
6518 !map.is_empty() && map.values().any(|entry| entry.scopes.contains(table))
6519 };
6520 let blob_has_match = {
6521 let cache = self.inner.result_blob_entries.read();
6522 let (ref map, _) = *cache;
6523 !map.is_empty() && map.values().any(|entry| entry.scopes.contains(table))
6524 };
6525 if legacy_has_match {
6526 let mut cache = self.inner.result_cache.write();
6527 let (ref mut map, ref mut order) = *cache;
6528 map.retain(|_, entry| !entry.scopes.contains(table));
6529 order.retain(|key| map.contains_key(key));
6530 }
6531
6532 if matches!(
6533 self.result_cache_backend(),
6534 RuntimeResultCacheBackend::BlobCache | RuntimeResultCacheBackend::Shadow
6535 ) {
6536 let mut blob_entries = self.inner.result_blob_entries.write();
6537 let (ref mut blob_map, ref mut blob_order) = *blob_entries;
6538 blob_map.clear();
6539 blob_order.clear();
6540 self.inner
6541 .result_blob_cache
6542 .invalidate_namespace(RESULT_CACHE_BLOB_NAMESPACE);
6543 } else if blob_has_match {
6544 let mut blob_entries = self.inner.result_blob_entries.write();
6545 let (ref mut blob_map, ref mut blob_order) = *blob_entries;
6546 blob_map.retain(|_, entry| !entry.scopes.contains(table));
6547 blob_order.retain(|key| blob_map.contains_key(key));
6548 }
6549 }
6550
6551 pub(crate) fn invalidate_plan_cache(&self) {
6552 self.inner.query_cache.write().clear();
6553 self.inner
6554 .ddl_epoch
6555 .fetch_add(1, std::sync::atomic::Ordering::Release);
6556 }
6557
6558 pub fn ddl_epoch(&self) -> u64 {
6562 self.inner
6563 .ddl_epoch
6564 .load(std::sync::atomic::Ordering::Acquire)
6565 }
6566
6567 pub(crate) fn clear_table_planner_stats(&self, table: &str) {
6568 let store = self.inner.db.store();
6569 crate::storage::query::planner::stats_catalog::clear_table_stats(store.as_ref(), table);
6570 self.invalidate_plan_cache();
6571 }
6572
6573 pub(crate) fn rehydrate_tenant_tables(&self) {
6582 let store = self.inner.db.store();
6583 let Some(manager) = store.get_collection("red_config") else {
6584 return;
6585 };
6586 for entity in manager.query_all(|_| true) {
6591 let crate::storage::unified::entity::EntityData::Row(row) = &entity.data else {
6592 continue;
6593 };
6594 let Some(named) = &row.named else { continue };
6595 let Some(crate::storage::schema::Value::Text(key)) = named.get("key") else {
6596 continue;
6597 };
6598 let Some(rest) = key.strip_prefix("tenant_tables.") else {
6600 continue;
6601 };
6602 let Some((table, suffix)) = rest.rsplit_once('.') else {
6603 crate::telemetry::operator_event::OperatorEvent::SchemaCorruption {
6609 collection: "red_config".to_string(),
6610 detail: format!("malformed tenant_tables key: {key}"),
6611 }
6612 .emit_global();
6613 continue;
6614 };
6615 if suffix != "column" {
6616 crate::telemetry::operator_event::OperatorEvent::SchemaCorruption {
6617 collection: "red_config".to_string(),
6618 detail: format!("unexpected tenant_tables suffix: {key}"),
6619 }
6620 .emit_global();
6621 continue;
6622 }
6623 match named.get("value") {
6624 Some(crate::storage::schema::Value::Text(column)) => {
6625 self.register_tenant_table(table, column);
6626 }
6627 Some(crate::storage::schema::Value::Null) | None => {
6629 self.unregister_tenant_table(table);
6630 }
6631 _ => {}
6632 }
6633 }
6634 }
6635
6636 pub fn register_tenant_table(&self, table: &str, column: &str) {
6641 use crate::storage::query::ast::{
6642 CompareOp, CreatePolicyQuery, Expr, FieldRef, Filter, Span,
6643 };
6644 self.inner
6645 .tenant_tables
6646 .write()
6647 .insert(table.to_string(), column.to_string());
6648
6649 let lhs = Expr::Column {
6655 field: FieldRef::TableColumn {
6656 table: table.to_string(),
6657 column: column.to_string(),
6658 },
6659 span: Span::synthetic(),
6660 };
6661 let rhs = Expr::FunctionCall {
6662 name: "CURRENT_TENANT".to_string(),
6663 args: Vec::new(),
6664 span: Span::synthetic(),
6665 };
6666 let policy_filter = Filter::CompareExpr {
6667 lhs,
6668 op: CompareOp::Eq,
6669 rhs,
6670 };
6671
6672 let policy = CreatePolicyQuery {
6673 name: "__tenant_iso".to_string(),
6674 table: table.to_string(),
6675 action: None, role: None, using: Box::new(policy_filter),
6678 target_kind: crate::storage::query::ast::PolicyTargetKind::Table,
6685 };
6686
6687 self.inner.rls_policies.write().insert(
6689 (table.to_string(), "__tenant_iso".to_string()),
6690 Arc::new(policy),
6691 );
6692 self.inner
6693 .rls_enabled_tables
6694 .write()
6695 .insert(table.to_string());
6696
6697 self.ensure_tenant_index(table, column);
6703 }
6704
6705 fn ensure_tenant_index(&self, table: &str, column: &str) {
6713 if column.contains('.') {
6714 return;
6715 }
6716 let index_name = format!("__tenant_idx_{table}");
6717 let registry = self.inner.index_store.list_indices(table);
6718 if registry.iter().any(|idx| idx.name == index_name) {
6719 return;
6720 }
6721 if registry
6722 .iter()
6723 .any(|idx| idx.columns.first().map(|c| c.as_str()) == Some(column))
6724 {
6725 return;
6726 }
6727
6728 let store = self.inner.db.store();
6729 let Some(manager) = store.get_collection(table) else {
6730 return;
6731 };
6732 let entities = manager.query_all(|_| true);
6733 let entity_fields: Vec<(
6734 crate::storage::unified::EntityId,
6735 Vec<(String, crate::storage::schema::Value)>,
6736 )> = entities
6737 .iter()
6738 .map(|e| {
6739 let fields = match &e.data {
6740 crate::storage::EntityData::Row(row) => {
6741 if let Some(ref named) = row.named {
6742 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
6743 } else if let Some(ref schema) = row.schema {
6744 schema
6745 .iter()
6746 .zip(row.columns.iter())
6747 .map(|(k, v)| (k.clone(), v.clone()))
6748 .collect()
6749 } else {
6750 Vec::new()
6751 }
6752 }
6753 crate::storage::EntityData::Node(node) => node
6754 .properties
6755 .iter()
6756 .map(|(k, v)| (k.clone(), v.clone()))
6757 .collect(),
6758 _ => Vec::new(),
6759 };
6760 (e.id, fields)
6761 })
6762 .collect();
6763
6764 let columns = vec![column.to_string()];
6765 if self
6766 .inner
6767 .index_store
6768 .create_index(
6769 &index_name,
6770 table,
6771 &columns,
6772 super::index_store::IndexMethodKind::Hash,
6773 false,
6774 &entity_fields,
6775 )
6776 .is_err()
6777 {
6778 return;
6779 }
6780 self.inner
6781 .index_store
6782 .register(super::index_store::RegisteredIndex {
6783 name: index_name,
6784 collection: table.to_string(),
6785 columns,
6786 method: super::index_store::IndexMethodKind::Hash,
6787 unique: false,
6788 });
6789 self.invalidate_plan_cache();
6790 }
6791
6792 fn drop_tenant_index(&self, table: &str) {
6795 let index_name = format!("__tenant_idx_{table}");
6796 self.inner.index_store.drop_index(&index_name, table);
6797 }
6798
6799 pub fn tenant_column(&self, table: &str) -> Option<String> {
6803 self.inner.tenant_tables.read().get(table).cloned()
6804 }
6805
6806 pub fn unregister_tenant_table(&self, table: &str) {
6810 self.inner.tenant_tables.write().remove(table);
6811 self.inner
6812 .rls_policies
6813 .write()
6814 .remove(&(table.to_string(), "__tenant_iso".to_string()));
6815 self.drop_tenant_index(table);
6816 let has_other_policies = self
6818 .inner
6819 .rls_policies
6820 .read()
6821 .keys()
6822 .any(|(t, _)| t == table);
6823 if !has_other_policies {
6824 self.inner.rls_enabled_tables.write().remove(table);
6825 }
6826 }
6827
6828 pub(crate) fn record_pending_tombstone(
6834 &self,
6835 conn_id: u64,
6836 collection: &str,
6837 id: crate::storage::unified::entity::EntityId,
6838 stamper_xid: crate::storage::transaction::snapshot::Xid,
6839 ) {
6840 self.inner
6841 .pending_tombstones
6842 .write()
6843 .entry(conn_id)
6844 .or_default()
6845 .push((collection.to_string(), id, stamper_xid));
6846 }
6847
6848 pub(crate) fn finalize_pending_tombstones(&self, conn_id: u64) {
6851 let Some(pending) = self.inner.pending_tombstones.write().remove(&conn_id) else {
6852 return;
6853 };
6854 if pending.is_empty() {
6855 return;
6856 }
6857
6858 let mut grouped: HashMap<String, Vec<crate::storage::unified::entity::EntityId>> =
6860 HashMap::new();
6861 for (collection, id, _xid) in pending {
6862 grouped.entry(collection).or_default().push(id);
6863 }
6864
6865 let store = self.inner.db.store();
6866 for (collection, ids) in grouped {
6867 if let Err(err) = store.delete_batch(&collection, &ids) {
6868 eprintln!(
6872 "pending tombstone delete_batch failed for {collection}: {err}; \
6873 rows stay xmax-stamped (reader-invisible) until VACUUM"
6874 );
6875 continue;
6876 }
6877 for id in &ids {
6878 store.context_index().remove_entity(*id);
6879 self.cdc_emit(
6880 crate::replication::cdc::ChangeOperation::Delete,
6881 &collection,
6882 id.raw(),
6883 "entity",
6884 );
6885 }
6886 }
6887 }
6888
6889 pub(crate) fn revive_pending_tombstones(&self, conn_id: u64) {
6896 let Some(pending) = self.inner.pending_tombstones.write().remove(&conn_id) else {
6897 return;
6898 };
6899
6900 let store = self.inner.db.store();
6901 for (collection, id, _xid) in pending {
6902 let Some(manager) = store.get_collection(&collection) else {
6903 continue;
6904 };
6905 if let Some(mut entity) = manager.get(id) {
6906 entity.set_xmax(0);
6907 let _ = manager.update(entity);
6908 }
6909 }
6910 }
6911
6912 pub(crate) fn finalize_pending_kv_watch_events(&self, conn_id: u64) {
6913 let Some(pending) = self.inner.pending_kv_watch_events.write().remove(&conn_id) else {
6914 return;
6915 };
6916 for event in pending {
6917 self.cdc_emit_kv(
6918 event.op,
6919 &event.collection,
6920 &event.key,
6921 0,
6922 event.before,
6923 event.after,
6924 );
6925 }
6926 }
6927
6928 pub(crate) fn discard_pending_kv_watch_events(&self, conn_id: u64) {
6929 self.inner.pending_kv_watch_events.write().remove(&conn_id);
6930 }
6931
6932 fn materialize_graph_with_rls(
6941 &self,
6942 ) -> RedDBResult<(
6943 crate::storage::engine::GraphStore,
6944 std::collections::HashMap<
6945 String,
6946 std::collections::HashMap<String, crate::storage::schema::Value>,
6947 >,
6948 )> {
6949 use crate::storage::engine::GraphStore;
6950 use crate::storage::query::ast::{PolicyAction, PolicyTargetKind};
6951 use crate::storage::unified::entity::{EntityData, EntityKind};
6952 use std::collections::{HashMap, HashSet};
6953
6954 let store = self.inner.db.store();
6955 let snap_ctx = capture_current_snapshot();
6956 let role = current_auth_identity().map(|(_, r)| r.as_str().to_string());
6957
6958 let graph = GraphStore::new();
6959 let mut node_properties: HashMap<String, HashMap<String, crate::storage::schema::Value>> =
6960 HashMap::new();
6961 let mut allowed_nodes: HashSet<String> = HashSet::new();
6962
6963 let mut node_rls: HashMap<String, Option<crate::storage::query::ast::Filter>> =
6967 HashMap::new();
6968 let mut edge_rls: HashMap<String, Option<crate::storage::query::ast::Filter>> =
6969 HashMap::new();
6970
6971 let collections = store.list_collections();
6972
6973 for collection in &collections {
6975 let Some(manager) = store.get_collection(collection) else {
6976 continue;
6977 };
6978 let entities = manager.query_all(|_| true);
6979 for entity in entities {
6980 if !entity_visible_with_context(snap_ctx.as_ref(), &entity) {
6981 continue;
6982 }
6983 let EntityKind::GraphNode(ref node) = entity.kind else {
6984 continue;
6985 };
6986 if !node_passes_rls(self, collection, role.as_deref(), &mut node_rls, &entity) {
6987 continue;
6988 }
6989 let id_str = entity.id.raw().to_string();
6990 graph
6991 .add_node_with_label(
6992 &id_str,
6993 &node.label,
6994 &super::graph_node_label(&node.node_type),
6995 )
6996 .map_err(|err| RedDBError::Query(err.to_string()))?;
6997 allowed_nodes.insert(id_str.clone());
6998 if let EntityData::Node(node_data) = &entity.data {
6999 node_properties.insert(id_str, node_data.properties.clone());
7000 }
7001 }
7002 }
7003
7004 for collection in &collections {
7008 let Some(manager) = store.get_collection(collection) else {
7009 continue;
7010 };
7011 let entities = manager.query_all(|_| true);
7012 for entity in entities {
7013 if !entity_visible_with_context(snap_ctx.as_ref(), &entity) {
7014 continue;
7015 }
7016 let EntityKind::GraphEdge(ref edge) = entity.kind else {
7017 continue;
7018 };
7019 if !allowed_nodes.contains(&edge.from_node)
7020 || !allowed_nodes.contains(&edge.to_node)
7021 {
7022 continue;
7023 }
7024 if !edge_passes_rls(self, collection, role.as_deref(), &mut edge_rls, &entity) {
7025 continue;
7026 }
7027 let weight = match &entity.data {
7028 EntityData::Edge(e) => e.weight,
7029 _ => edge.weight as f32 / 1000.0,
7030 };
7031 graph
7032 .add_edge_with_label(
7033 &edge.from_node,
7034 &edge.to_node,
7035 &super::graph_edge_label(&edge.label),
7036 weight,
7037 )
7038 .map_err(|err| RedDBError::Query(err.to_string()))?;
7039 }
7040 }
7041
7042 let _ = (PolicyAction::Select, PolicyTargetKind::Nodes);
7046
7047 Ok((graph, node_properties))
7048 }
7049
7050 pub(crate) fn stamp_xmin_if_in_txn(
7065 &self,
7066 collection: &str,
7067 id: crate::storage::unified::entity::EntityId,
7068 ) {
7069 let Some(xid) = self.current_xid() else {
7070 return;
7071 };
7072 let store = self.inner.db.store();
7073 let Some(manager) = store.get_collection(collection) else {
7074 return;
7075 };
7076 if let Some(mut entity) = manager.get(id) {
7077 entity.set_xmin(xid);
7078 let _ = manager.update(entity);
7079 }
7080 }
7081
7082 pub(crate) fn revive_tombstones_since(&self, conn_id: u64, stamper_xid: u64) -> usize {
7090 let mut guard = self.inner.pending_tombstones.write();
7091 let Some(pending) = guard.get_mut(&conn_id) else {
7092 return 0;
7093 };
7094
7095 let store = self.inner.db.store();
7096 let mut revived = 0usize;
7097 pending.retain(|(collection, id, xid)| {
7098 if *xid < stamper_xid {
7099 return true;
7101 }
7102 if let Some(manager) = store.get_collection(collection) {
7103 if let Some(mut entity) = manager.get(*id) {
7104 entity.set_xmax(0);
7105 let _ = manager.update(entity);
7106 revived += 1;
7107 }
7108 }
7109 false
7110 });
7111 if pending.is_empty() {
7112 guard.remove(&conn_id);
7113 }
7114 revived
7115 }
7116
7117 pub fn current_snapshot(&self) -> crate::storage::transaction::snapshot::Snapshot {
7126 let conn_id = current_connection_id();
7127 if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id).cloned() {
7128 return ctx.snapshot;
7129 }
7130 let high_water = self.inner.snapshot_manager.peek_next_xid();
7136 self.inner.snapshot_manager.snapshot(high_water)
7137 }
7138
7139 pub fn current_xid(&self) -> Option<crate::storage::transaction::snapshot::Xid> {
7149 let conn_id = current_connection_id();
7150 self.inner
7151 .tx_contexts
7152 .read()
7153 .get(&conn_id)
7154 .map(|ctx| ctx.writer_xid())
7155 }
7156
7157 pub fn snapshot_manager(&self) -> Arc<crate::storage::transaction::snapshot::SnapshotManager> {
7160 Arc::clone(&self.inner.snapshot_manager)
7161 }
7162
7163 pub fn current_txn_own_xids(
7168 &self,
7169 ) -> std::collections::HashSet<crate::storage::transaction::snapshot::Xid> {
7170 let mut set = std::collections::HashSet::new();
7171 if let Some(ctx) = self.inner.tx_contexts.read().get(¤t_connection_id()) {
7172 set.insert(ctx.xid);
7173 for (_, sub) in &ctx.savepoints {
7174 set.insert(*sub);
7175 }
7176 }
7177 set
7178 }
7179
7180 pub fn foreign_tables(&self) -> Arc<crate::storage::fdw::ForeignTableRegistry> {
7187 Arc::clone(&self.inner.foreign_tables)
7188 }
7189
7190 pub fn is_rls_enabled(&self, table: &str) -> bool {
7192 self.inner.rls_enabled_tables.read().contains(table)
7193 }
7194
7195 pub fn matching_rls_policies(
7202 &self,
7203 table: &str,
7204 role: Option<&str>,
7205 action: crate::storage::query::ast::PolicyAction,
7206 ) -> Vec<crate::storage::query::ast::Filter> {
7207 self.matching_rls_policies_for_kind(
7212 table,
7213 role,
7214 action,
7215 crate::storage::query::ast::PolicyTargetKind::Table,
7216 )
7217 }
7218
7219 pub fn matching_rls_policies_for_kind(
7227 &self,
7228 table: &str,
7229 role: Option<&str>,
7230 action: crate::storage::query::ast::PolicyAction,
7231 kind: crate::storage::query::ast::PolicyTargetKind,
7232 ) -> Vec<crate::storage::query::ast::Filter> {
7233 if !self.is_rls_enabled(table) {
7234 return Vec::new();
7235 }
7236 let policies = self.inner.rls_policies.read();
7237 policies
7238 .iter()
7239 .filter_map(|((t, _), p)| {
7240 if t != table {
7241 return None;
7242 }
7243 if p.target_kind != kind
7252 && p.target_kind != crate::storage::query::ast::PolicyTargetKind::Table
7253 {
7254 return None;
7255 }
7256 if let Some(a) = p.action {
7258 if a != action {
7259 return None;
7260 }
7261 }
7262 if let Some(p_role) = p.role.as_deref() {
7264 match role {
7265 Some(r) if r == p_role => {}
7266 _ => return None,
7267 }
7268 }
7269 Some((*p.using).clone())
7270 })
7271 .collect()
7272 }
7273
7274 pub(crate) fn refresh_table_planner_stats(&self, table: &str) {
7275 let store = self.inner.db.store();
7276 if let Some(stats) =
7277 crate::storage::query::planner::stats_catalog::analyze_collection(store.as_ref(), table)
7278 {
7279 crate::storage::query::planner::stats_catalog::persist_table_stats(
7280 store.as_ref(),
7281 &stats,
7282 );
7283 } else {
7284 crate::storage::query::planner::stats_catalog::clear_table_stats(store.as_ref(), table);
7285 }
7286 self.invalidate_plan_cache();
7287 }
7288
7289 pub(crate) fn note_table_write(&self, table: &str) {
7290 let already_dirty = self.inner.planner_dirty_tables.read().contains(table);
7295 if !already_dirty {
7296 self.inner
7297 .planner_dirty_tables
7298 .write()
7299 .insert(table.to_string());
7300 }
7301 self.invalidate_result_cache_for_table(table);
7302 }
7303
7304 fn explain_as_rows(&self, raw_query: &str, inner_sql: &str) -> RedDBResult<RuntimeQueryResult> {
7312 let explain = self.explain_query(inner_sql)?;
7313
7314 let columns = vec![
7315 "op".to_string(),
7316 "source".to_string(),
7317 "est_rows".to_string(),
7318 "est_cost".to_string(),
7319 "depth".to_string(),
7320 ];
7321
7322 let mut records: Vec<crate::storage::query::unified::UnifiedRecord> = Vec::new();
7323
7324 for name in &explain.cte_materializations {
7330 use std::sync::Arc;
7331 let mut rec = crate::storage::query::unified::UnifiedRecord::default();
7332 rec.set_arc(Arc::from("op"), Value::text("CteScan".to_string()));
7333 rec.set_arc(Arc::from("source"), Value::text(name.clone()));
7334 rec.set_arc(Arc::from("est_rows"), Value::Float(0.0));
7335 rec.set_arc(Arc::from("est_cost"), Value::Float(0.0));
7336 rec.set_arc(Arc::from("depth"), Value::Integer(0));
7337 records.push(rec);
7338 }
7339
7340 walk_plan_node(&explain.logical_plan.root, 0, &mut records);
7341
7342 let result = crate::storage::query::unified::UnifiedResult {
7343 columns,
7344 records,
7345 stats: Default::default(),
7346 pre_serialized_json: None,
7347 };
7348
7349 Ok(RuntimeQueryResult {
7350 query: raw_query.to_string(),
7351 mode: explain.mode,
7352 statement: "explain",
7353 engine: "runtime-explain",
7354 result,
7355 affected_rows: 0,
7356 statement_type: "select",
7357 })
7358 }
7359
7360 pub(super) fn check_query_privilege(
7368 &self,
7369 expr: &crate::storage::query::ast::QueryExpr,
7370 ) -> Result<(), String> {
7371 use crate::auth::privileges::{Action, AuthzContext, Resource};
7372 use crate::auth::UserId;
7373 use crate::storage::query::ast::QueryExpr;
7374
7375 let auth_store = match self.inner.auth_store.read().clone() {
7380 Some(s) => s,
7381 None => return Ok(()),
7382 };
7383
7384 let (username, role) = match current_auth_identity() {
7390 Some(p) => p,
7391 None => return Ok(()),
7392 };
7393 let tenant = current_tenant();
7394
7395 let ctx = AuthzContext {
7396 principal: &username,
7397 effective_role: role,
7398 tenant: tenant.as_deref(),
7399 };
7400 let principal_id = UserId::from_parts(tenant.as_deref(), &username);
7401
7402 let (action, resource) = match expr {
7404 QueryExpr::Table(t) => (Action::Select, Resource::table_from_name(&t.table)),
7405 QueryExpr::QueueSelect(q) => (Action::Select, Resource::table_from_name(&q.queue)),
7406 QueryExpr::Graph(g) => {
7407 if auth_store.iam_authorization_enabled() {
7408 self.check_graph_property_projection_privilege(
7409 &auth_store,
7410 &principal_id,
7411 role,
7412 tenant.as_deref(),
7413 g,
7414 )?;
7415 return Ok(());
7416 }
7417 return Ok(());
7418 }
7419 QueryExpr::Vector(v) => {
7420 if auth_store.iam_authorization_enabled() {
7421 self.check_table_like_column_projection_privilege(
7422 &auth_store,
7423 &principal_id,
7424 role,
7425 tenant.as_deref(),
7426 &v.collection,
7427 &["content".to_string()],
7428 )?;
7429 return Ok(());
7430 }
7431 return Ok(());
7432 }
7433 QueryExpr::Insert(i) => (Action::Insert, Resource::table_from_name(&i.table)),
7434 QueryExpr::Update(u) => (Action::Update, Resource::table_from_name(&u.table)),
7435 QueryExpr::Delete(d) => (Action::Delete, Resource::table_from_name(&d.table)),
7436 QueryExpr::Join(_) => (Action::Select, Resource::Database),
7440 QueryExpr::Grant(_) | QueryExpr::Revoke(_) | QueryExpr::AlterUser(_) => {
7443 return if role == crate::auth::Role::Admin {
7444 Ok(())
7445 } else {
7446 Err(format!(
7447 "principal=`{}` role=`{:?}` cannot issue ACL/auth DDL",
7448 username, role
7449 ))
7450 };
7451 }
7452 QueryExpr::CreateIamPolicy { id, .. } => {
7453 return self.check_policy_management_privilege(
7454 &auth_store,
7455 &principal_id,
7456 role,
7457 tenant.as_deref(),
7458 "policy:put",
7459 "policy",
7460 id,
7461 );
7462 }
7463 QueryExpr::DropIamPolicy { id } => {
7464 return self.check_policy_management_privilege(
7465 &auth_store,
7466 &principal_id,
7467 role,
7468 tenant.as_deref(),
7469 "policy:drop",
7470 "policy",
7471 id,
7472 );
7473 }
7474 QueryExpr::AttachPolicy { policy_id, .. } => {
7475 return self.check_policy_management_privilege(
7476 &auth_store,
7477 &principal_id,
7478 role,
7479 tenant.as_deref(),
7480 "policy:attach",
7481 "policy",
7482 policy_id,
7483 );
7484 }
7485 QueryExpr::DetachPolicy { policy_id, .. } => {
7486 return self.check_policy_management_privilege(
7487 &auth_store,
7488 &principal_id,
7489 role,
7490 tenant.as_deref(),
7491 "policy:detach",
7492 "policy",
7493 policy_id,
7494 );
7495 }
7496 QueryExpr::ShowPolicies { .. } | QueryExpr::ShowEffectivePermissions { .. } => {
7497 return Ok(());
7498 }
7499 QueryExpr::SimulatePolicy { .. } => {
7500 return self.check_policy_management_privilege(
7501 &auth_store,
7502 &principal_id,
7503 role,
7504 tenant.as_deref(),
7505 "policy:simulate",
7506 "policy",
7507 "*",
7508 );
7509 }
7510 QueryExpr::DropTable(q) => {
7513 return self.check_ddl_collection_privilege(
7514 &auth_store,
7515 &principal_id,
7516 role,
7517 tenant.as_deref(),
7518 &username,
7519 "drop",
7520 &q.name,
7521 );
7522 }
7523 QueryExpr::DropGraph(q) => {
7524 return self.check_ddl_collection_privilege(
7525 &auth_store,
7526 &principal_id,
7527 role,
7528 tenant.as_deref(),
7529 &username,
7530 "drop",
7531 &q.name,
7532 );
7533 }
7534 QueryExpr::DropVector(q) => {
7535 return self.check_ddl_collection_privilege(
7536 &auth_store,
7537 &principal_id,
7538 role,
7539 tenant.as_deref(),
7540 &username,
7541 "drop",
7542 &q.name,
7543 );
7544 }
7545 QueryExpr::DropDocument(q) => {
7546 return self.check_ddl_collection_privilege(
7547 &auth_store,
7548 &principal_id,
7549 role,
7550 tenant.as_deref(),
7551 &username,
7552 "drop",
7553 &q.name,
7554 );
7555 }
7556 QueryExpr::DropKv(q) => {
7557 return self.check_ddl_collection_privilege(
7558 &auth_store,
7559 &principal_id,
7560 role,
7561 tenant.as_deref(),
7562 &username,
7563 "drop",
7564 &q.name,
7565 );
7566 }
7567 QueryExpr::DropCollection(q) => {
7568 return self.check_ddl_collection_privilege(
7569 &auth_store,
7570 &principal_id,
7571 role,
7572 tenant.as_deref(),
7573 &username,
7574 "drop",
7575 &q.name,
7576 );
7577 }
7578 QueryExpr::Truncate(q) => {
7579 return self.check_ddl_collection_privilege(
7580 &auth_store,
7581 &principal_id,
7582 role,
7583 tenant.as_deref(),
7584 &username,
7585 "truncate",
7586 &q.name,
7587 );
7588 }
7589 QueryExpr::CreateTable(_)
7591 | QueryExpr::AlterTable(_)
7592 | QueryExpr::CreateIndex(_)
7593 | QueryExpr::DropIndex(_)
7594 | QueryExpr::CreateSchema(_)
7595 | QueryExpr::DropSchema(_)
7596 | QueryExpr::CreateSequence(_)
7597 | QueryExpr::DropSequence(_)
7598 | QueryExpr::CreateView(_)
7599 | QueryExpr::DropView(_)
7600 | QueryExpr::RefreshMaterializedView(_)
7601 | QueryExpr::CreatePolicy(_)
7602 | QueryExpr::DropPolicy(_)
7603 | QueryExpr::CreateServer(_)
7604 | QueryExpr::DropServer(_)
7605 | QueryExpr::CreateForeignTable(_)
7606 | QueryExpr::DropForeignTable(_)
7607 | QueryExpr::CreateTimeSeries(_)
7608 | QueryExpr::DropTimeSeries(_)
7609 | QueryExpr::CreateQueue(_)
7610 | QueryExpr::AlterQueue(_)
7611 | QueryExpr::DropQueue(_)
7612 | QueryExpr::CreateTree(_)
7613 | QueryExpr::DropTree(_) => {
7614 return if role >= crate::auth::Role::Write {
7615 Ok(())
7616 } else {
7617 Err(format!(
7618 "principal=`{}` role=`{:?}` cannot issue DDL",
7619 username, role
7620 ))
7621 };
7622 }
7623 QueryExpr::CreateMigration(_) => {
7625 return if role >= crate::auth::Role::Write {
7626 Ok(())
7627 } else {
7628 Err(format!(
7629 "principal=`{}` role=`{:?}` cannot issue CREATE MIGRATION",
7630 username, role
7631 ))
7632 };
7633 }
7634 QueryExpr::ApplyMigration(_) | QueryExpr::RollbackMigration(_) => {
7636 return if role == crate::auth::Role::Admin {
7637 Ok(())
7638 } else {
7639 Err(format!(
7640 "principal=`{}` role=`{:?}` cannot issue APPLY/ROLLBACK MIGRATION",
7641 username, role
7642 ))
7643 };
7644 }
7645 QueryExpr::ExplainMigration(_) => return Ok(()),
7647 _ => return Ok(()),
7651 };
7652
7653 if auth_store.iam_authorization_enabled() {
7654 let iam_action = legacy_action_to_iam(action);
7655 let iam_resource = legacy_resource_to_iam(&resource, tenant.as_deref());
7656 let iam_ctx = runtime_iam_context(role, tenant.as_deref());
7657 if !auth_store.check_policy_authz(&principal_id, iam_action, &iam_resource, &iam_ctx) {
7658 return Err(format!(
7659 "principal=`{}` action=`{}` resource=`{}:{}` denied by IAM policy",
7660 username, iam_action, iam_resource.kind, iam_resource.name
7661 ));
7662 }
7663
7664 if let QueryExpr::Table(table) = expr {
7665 self.check_table_column_projection_privilege(
7666 &auth_store,
7667 &principal_id,
7668 &iam_ctx,
7669 table,
7670 )?;
7671 }
7672
7673 if let QueryExpr::Update(update) = expr {
7674 let columns = update_set_target_columns(update);
7675 if !columns.is_empty() {
7676 let request = column_access_request_for_table_update(&update.table, columns);
7677 let outcome =
7678 auth_store.check_column_projection_authz(&principal_id, &request, &iam_ctx);
7679 if let Some(denied) = outcome.first_denied_column() {
7680 return Err(format!(
7681 "principal=`{}` action=`{}` resource=`{}:{}` denied by IAM column policy",
7682 username, iam_action, denied.resource.kind, denied.resource.name
7683 ));
7684 }
7685 if !outcome.allowed() {
7686 return Err(format!(
7687 "principal=`{}` action=`{}` resource=`{}:{}` denied by IAM policy",
7688 username,
7689 iam_action,
7690 outcome.table_resource.kind,
7691 outcome.table_resource.name
7692 ));
7693 }
7694 }
7695 }
7696
7697 Ok(())
7698 } else {
7699 auth_store
7700 .check_grant(&ctx, action, &resource)
7701 .map_err(|e| e.to_string())
7702 }
7703 }
7704
7705 fn check_table_column_projection_privilege(
7706 &self,
7707 auth_store: &Arc<crate::auth::store::AuthStore>,
7708 principal: &crate::auth::UserId,
7709 ctx: &crate::auth::policies::EvalContext,
7710 table: &crate::storage::query::ast::TableQuery,
7711 ) -> Result<(), String> {
7712 use crate::auth::{ColumnAccessRequest, ColumnDecisionEffect};
7713
7714 let columns = requested_table_columns_for_policy(table);
7715 if columns.is_empty() {
7716 return Ok(());
7717 }
7718
7719 let request = ColumnAccessRequest::select(table.table.clone(), columns);
7720 let outcome = auth_store.check_column_projection_authz(principal, &request, ctx);
7721 if outcome.allowed() {
7722 return Ok(());
7723 }
7724
7725 if !matches!(
7726 outcome.table_decision,
7727 crate::auth::policies::Decision::Allow { .. }
7728 | crate::auth::policies::Decision::AdminBypass
7729 ) {
7730 return Err(format!(
7731 "principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
7732 principal, outcome.table_resource.kind, outcome.table_resource.name
7733 ));
7734 }
7735
7736 let denied = outcome
7737 .first_denied_column()
7738 .filter(|decision| decision.effective == ColumnDecisionEffect::Denied);
7739 match denied {
7740 Some(decision) => Err(format!(
7741 "principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
7742 principal, decision.resource.kind, decision.resource.name
7743 )),
7744 None => Ok(()),
7745 }
7746 }
7747
7748 fn check_graph_property_projection_privilege(
7749 &self,
7750 auth_store: &Arc<crate::auth::store::AuthStore>,
7751 principal: &crate::auth::UserId,
7752 role: crate::auth::Role,
7753 tenant: Option<&str>,
7754 query: &crate::storage::query::ast::GraphQuery,
7755 ) -> Result<(), String> {
7756 let columns = explicit_graph_projection_properties(query);
7757 if columns.is_empty() {
7758 return Ok(());
7759 }
7760 self.check_table_like_column_projection_privilege(
7761 auth_store, principal, role, tenant, "graph", &columns,
7762 )
7763 }
7764
7765 fn check_table_like_column_projection_privilege(
7766 &self,
7767 auth_store: &Arc<crate::auth::store::AuthStore>,
7768 principal: &crate::auth::UserId,
7769 role: crate::auth::Role,
7770 tenant: Option<&str>,
7771 table: &str,
7772 columns: &[String],
7773 ) -> Result<(), String> {
7774 let iam_ctx = runtime_iam_context(role, tenant);
7775 let request =
7776 crate::auth::ColumnAccessRequest::select(table.to_string(), columns.iter().cloned());
7777 let outcome = auth_store.check_column_projection_authz(principal, &request, &iam_ctx);
7778 if outcome.allowed() {
7779 return Ok(());
7780 }
7781 let denied = outcome
7782 .first_denied_column()
7783 .map(|d| d.resource.name.clone())
7784 .unwrap_or_else(|| format!("{table}.<unknown>"));
7785 Err(format!(
7786 "principal=`{}` action=`select` resource=`column:{}` denied by IAM policy",
7787 principal, denied
7788 ))
7789 }
7790
7791 fn check_policy_management_privilege(
7792 &self,
7793 auth_store: &Arc<crate::auth::store::AuthStore>,
7794 principal: &crate::auth::UserId,
7795 role: crate::auth::Role,
7796 tenant: Option<&str>,
7797 action: &str,
7798 resource_kind: &str,
7799 resource_name: &str,
7800 ) -> Result<(), String> {
7801 if !auth_store.iam_authorization_enabled() {
7802 return if role == crate::auth::Role::Admin {
7803 Ok(())
7804 } else {
7805 Err(format!(
7806 "principal=`{}` role=`{:?}` cannot issue ACL/auth DDL",
7807 principal, role
7808 ))
7809 };
7810 }
7811
7812 let mut resource = crate::auth::policies::ResourceRef::new(
7813 resource_kind.to_string(),
7814 resource_name.to_string(),
7815 );
7816 if let Some(t) = tenant {
7817 resource = resource.with_tenant(t.to_string());
7818 }
7819 let ctx = runtime_iam_context(role, tenant);
7820 if auth_store.check_policy_authz(principal, action, &resource, &ctx) {
7821 Ok(())
7822 } else {
7823 Err(format!(
7824 "principal=`{}` action=`{}` resource=`{}:{}` denied by IAM policy",
7825 principal, action, resource.kind, resource.name
7826 ))
7827 }
7828 }
7829
7830 fn check_ddl_collection_privilege(
7837 &self,
7838 auth_store: &Arc<crate::auth::store::AuthStore>,
7839 principal: &crate::auth::UserId,
7840 role: crate::auth::Role,
7841 tenant: Option<&str>,
7842 username: &str,
7843 action: &str,
7844 collection: &str,
7845 ) -> Result<(), String> {
7846 if role < crate::auth::Role::Write {
7847 let msg = format!(
7848 "principal=`{}` role=`{:?}` cannot issue DDL",
7849 username, role
7850 );
7851 self.inner.audit_log.record(
7852 action,
7853 username,
7854 collection,
7855 "denied",
7856 crate::json::Value::Null,
7857 );
7858 return Err(msg);
7859 }
7860
7861 if !auth_store.iam_authorization_enabled() {
7862 self.inner.audit_log.record(
7863 action,
7864 username,
7865 collection,
7866 "ok",
7867 crate::json::Value::Null,
7868 );
7869 return Ok(());
7870 }
7871
7872 let resource_name = collection.to_string();
7873 let mut resource = crate::auth::policies::ResourceRef::new(
7874 "collection".to_string(),
7875 resource_name.clone(),
7876 );
7877 if let Some(t) = tenant {
7878 resource = resource.with_tenant(t.to_string());
7879 }
7880 let ctx = runtime_iam_context(role, tenant);
7881 if auth_store.check_policy_authz(principal, action, &resource, &ctx) {
7882 self.inner.audit_log.record(
7883 action,
7884 username,
7885 &resource_name,
7886 "ok",
7887 crate::json::Value::Null,
7888 );
7889 Ok(())
7890 } else {
7891 self.inner.audit_log.record(
7892 action,
7893 username,
7894 &resource_name,
7895 "denied",
7896 crate::json::Value::Null,
7897 );
7898 Err(format!(
7899 "principal=`{}` action=`{}` resource=`collection:{}` denied by IAM policy",
7900 username, action, resource_name
7901 ))
7902 }
7903 }
7904
7905 fn execute_grant_statement(
7907 &self,
7908 query: &str,
7909 stmt: &crate::storage::query::ast::GrantStmt,
7910 ) -> RedDBResult<RuntimeQueryResult> {
7911 use crate::auth::privileges::{Action, GrantPrincipal, Resource};
7912 use crate::auth::UserId;
7913 use crate::storage::query::ast::{GrantObjectKind, GrantPrincipalRef};
7914
7915 let auth_store = self
7916 .inner
7917 .auth_store
7918 .read()
7919 .clone()
7920 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
7921
7922 let (gname, grole) = current_auth_identity().ok_or_else(|| {
7924 RedDBError::Query("GRANT requires an authenticated principal".to_string())
7925 })?;
7926 let granter = UserId::from_parts(current_tenant().as_deref(), &gname);
7927 let granter_role = grole;
7928
7929 let mut actions: Vec<Action> = Vec::new();
7931 if stmt.all {
7932 actions.push(Action::All);
7933 } else {
7934 for kw in &stmt.actions {
7935 let a = Action::from_keyword(kw).ok_or_else(|| {
7936 RedDBError::Query(format!("unknown privilege keyword `{}`", kw))
7937 })?;
7938 actions.push(a);
7939 }
7940 }
7941
7942 let mut applied = 0usize;
7944 for obj in &stmt.objects {
7945 let resource = match stmt.object_kind {
7946 GrantObjectKind::Table => Resource::Table {
7947 schema: obj.schema.clone(),
7948 table: obj.name.clone(),
7949 },
7950 GrantObjectKind::Schema => Resource::Schema(obj.name.clone()),
7951 GrantObjectKind::Database => Resource::Database,
7952 GrantObjectKind::Function => Resource::Function {
7953 schema: obj.schema.clone(),
7954 name: obj.name.clone(),
7955 },
7956 };
7957 for principal in &stmt.principals {
7958 let p = match principal {
7959 GrantPrincipalRef::Public => GrantPrincipal::Public,
7960 GrantPrincipalRef::Group(g) => GrantPrincipal::Group(g.clone()),
7961 GrantPrincipalRef::User { tenant, name } => {
7962 GrantPrincipal::User(UserId::from_parts(tenant.as_deref(), name))
7963 }
7964 };
7965 let tenant = granter.tenant.clone();
7968 auth_store
7969 .grant(
7970 &granter,
7971 granter_role,
7972 p.clone(),
7973 resource.clone(),
7974 actions.clone(),
7975 stmt.with_grant_option,
7976 tenant.clone(),
7977 )
7978 .map_err(|e| RedDBError::Query(e.to_string()))?;
7979
7980 if let Some(policy) =
7984 grant_to_iam_policy(&p, &resource, &actions, tenant.as_deref())
7985 {
7986 let pid = policy.id.clone();
7987 auth_store
7988 .put_policy_internal(policy)
7989 .map_err(|e| RedDBError::Query(e.to_string()))?;
7990 let attachment = match &p {
7991 GrantPrincipal::User(uid) => {
7992 crate::auth::store::PrincipalRef::User(uid.clone())
7993 }
7994 GrantPrincipal::Group(group) => {
7995 crate::auth::store::PrincipalRef::Group(group.clone())
7996 }
7997 GrantPrincipal::Public => crate::auth::store::PrincipalRef::Group(
7998 crate::auth::store::PUBLIC_IAM_GROUP.to_string(),
7999 ),
8000 };
8001 auth_store
8002 .attach_policy(attachment, &pid)
8003 .map_err(|e| RedDBError::Query(e.to_string()))?;
8004 }
8005 applied += 1;
8006 tracing::info!(
8007 target: "audit",
8008 principal = %granter,
8009 action = "grant",
8010 "GRANT applied"
8011 );
8012 }
8013 }
8014
8015 self.invalidate_result_cache();
8016 Ok(RuntimeQueryResult::ok_message(
8017 query.to_string(),
8018 &format!("GRANT applied to {} target(s)", applied),
8019 "grant",
8020 ))
8021 }
8022
8023 fn execute_revoke_statement(
8025 &self,
8026 query: &str,
8027 stmt: &crate::storage::query::ast::RevokeStmt,
8028 ) -> RedDBResult<RuntimeQueryResult> {
8029 use crate::auth::privileges::{Action, GrantPrincipal, Resource};
8030 use crate::auth::UserId;
8031 use crate::storage::query::ast::{GrantObjectKind, GrantPrincipalRef};
8032
8033 let auth_store = self
8034 .inner
8035 .auth_store
8036 .read()
8037 .clone()
8038 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8039
8040 let (_gname, grole) = current_auth_identity().ok_or_else(|| {
8041 RedDBError::Query("REVOKE requires an authenticated principal".to_string())
8042 })?;
8043 let granter_role = grole;
8044
8045 let actions: Vec<Action> = if stmt.all {
8046 vec![Action::All]
8047 } else {
8048 stmt.actions
8049 .iter()
8050 .map(|kw| Action::from_keyword(kw).unwrap_or(Action::Select))
8051 .collect()
8052 };
8053
8054 let mut total_removed = 0usize;
8055 for obj in &stmt.objects {
8056 let resource = match stmt.object_kind {
8057 GrantObjectKind::Table => Resource::Table {
8058 schema: obj.schema.clone(),
8059 table: obj.name.clone(),
8060 },
8061 GrantObjectKind::Schema => Resource::Schema(obj.name.clone()),
8062 GrantObjectKind::Database => Resource::Database,
8063 GrantObjectKind::Function => Resource::Function {
8064 schema: obj.schema.clone(),
8065 name: obj.name.clone(),
8066 },
8067 };
8068 for principal in &stmt.principals {
8069 let p = match principal {
8070 GrantPrincipalRef::Public => GrantPrincipal::Public,
8071 GrantPrincipalRef::Group(g) => GrantPrincipal::Group(g.clone()),
8072 GrantPrincipalRef::User { tenant, name } => {
8073 GrantPrincipal::User(UserId::from_parts(tenant.as_deref(), name))
8074 }
8075 };
8076 let removed = auth_store
8077 .revoke(granter_role, &p, &resource, &actions)
8078 .map_err(|e| RedDBError::Query(e.to_string()))?;
8079 let _removed_policies =
8080 auth_store.delete_synthetic_grant_policies(&p, &resource, &actions);
8081 total_removed += removed;
8082 }
8083 }
8084
8085 self.invalidate_result_cache();
8086 Ok(RuntimeQueryResult::ok_message(
8087 query.to_string(),
8088 &format!("REVOKE removed {} grant(s)", total_removed),
8089 "revoke",
8090 ))
8091 }
8092
8093 fn execute_alter_user_statement(
8095 &self,
8096 query: &str,
8097 stmt: &crate::storage::query::ast::AlterUserStmt,
8098 ) -> RedDBResult<RuntimeQueryResult> {
8099 use crate::auth::privileges::UserAttributes;
8100 use crate::auth::UserId;
8101 use crate::storage::query::ast::AlterUserAttribute;
8102
8103 let auth_store = self
8104 .inner
8105 .auth_store
8106 .read()
8107 .clone()
8108 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8109
8110 let (_gname, grole) = current_auth_identity().ok_or_else(|| {
8111 RedDBError::Query("ALTER USER requires an authenticated principal".to_string())
8112 })?;
8113 if grole != crate::auth::Role::Admin {
8114 return Err(RedDBError::Query(
8115 "ALTER USER requires Admin role".to_string(),
8116 ));
8117 }
8118
8119 let target = UserId::from_parts(stmt.tenant.as_deref(), &stmt.username);
8120
8121 let mut attrs = auth_store.user_attributes(&target);
8124 let mut enable_change: Option<bool> = None;
8125
8126 for a in &stmt.attributes {
8127 match a {
8128 AlterUserAttribute::ValidUntil(ts) => {
8129 let ms = parse_timestamp_to_ms(ts).ok_or_else(|| {
8133 RedDBError::Query(format!("invalid VALID UNTIL timestamp `{ts}`"))
8134 })?;
8135 attrs.valid_until = Some(ms);
8136 }
8137 AlterUserAttribute::ConnectionLimit(n) => {
8138 if *n < 0 {
8139 return Err(RedDBError::Query(
8140 "CONNECTION LIMIT must be non-negative".to_string(),
8141 ));
8142 }
8143 attrs.connection_limit = Some(*n as u32);
8144 }
8145 AlterUserAttribute::SetSearchPath(p) => {
8146 attrs.search_path = Some(p.clone());
8147 }
8148 AlterUserAttribute::AddGroup(g) => {
8149 if !attrs.groups.iter().any(|existing| existing == g) {
8150 attrs.groups.push(g.clone());
8151 attrs.groups.sort();
8152 }
8153 }
8154 AlterUserAttribute::DropGroup(g) => {
8155 attrs.groups.retain(|existing| existing != g);
8156 }
8157 AlterUserAttribute::Enable => enable_change = Some(true),
8158 AlterUserAttribute::Disable => enable_change = Some(false),
8159 AlterUserAttribute::Password(_) => {
8160 }
8164 }
8165 }
8166
8167 auth_store
8168 .set_user_attributes(&target, attrs)
8169 .map_err(|e| RedDBError::Query(e.to_string()))?;
8170 if let Some(en) = enable_change {
8171 auth_store
8172 .set_user_enabled(&target, en)
8173 .map_err(|e| RedDBError::Query(e.to_string()))?;
8174 }
8175 self.invalidate_result_cache();
8176 tracing::info!(
8177 target: "audit",
8178 principal = %target,
8179 action = "alter_user",
8180 "ALTER USER applied"
8181 );
8182
8183 Ok(RuntimeQueryResult::ok_message(
8184 query.to_string(),
8185 &format!("ALTER USER {} applied", target),
8186 "alter_user",
8187 ))
8188 }
8189
8190 fn execute_create_iam_policy(
8195 &self,
8196 query: &str,
8197 id: &str,
8198 json: &str,
8199 ) -> RedDBResult<RuntimeQueryResult> {
8200 use crate::auth::policies::Policy;
8201
8202 let auth_store = self
8203 .inner
8204 .auth_store
8205 .read()
8206 .clone()
8207 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8208
8209 let mut policy = Policy::from_json_str(json)
8214 .map_err(|e| RedDBError::Query(format!("policy parse: {e}")))?;
8215 if policy.id != id {
8216 policy.id = id.to_string();
8217 }
8218 let pid = policy.id.clone();
8219 auth_store
8220 .put_policy(policy)
8221 .map_err(|e| RedDBError::Query(e.to_string()))?;
8222
8223 let principal = current_auth_identity()
8224 .map(|(u, _)| u)
8225 .unwrap_or_else(|| "anonymous".into());
8226 tracing::info!(
8227 target: "audit",
8228 principal = %principal,
8229 action = "iam:policy.put",
8230 matched_policy_id = %pid,
8231 "CREATE POLICY applied"
8232 );
8233 self.inner.audit_log.record(
8234 "iam/policy.put",
8235 &principal,
8236 &pid,
8237 "ok",
8238 crate::json::Value::Null,
8239 );
8240
8241 self.invalidate_result_cache();
8242 Ok(RuntimeQueryResult::ok_message(
8243 query.to_string(),
8244 &format!("policy `{pid}` stored"),
8245 "create_iam_policy",
8246 ))
8247 }
8248
8249 fn execute_drop_iam_policy(&self, query: &str, id: &str) -> RedDBResult<RuntimeQueryResult> {
8250 let auth_store = self
8251 .inner
8252 .auth_store
8253 .read()
8254 .clone()
8255 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8256 auth_store
8257 .delete_policy(id)
8258 .map_err(|e| RedDBError::Query(e.to_string()))?;
8259
8260 let principal = current_auth_identity()
8261 .map(|(u, _)| u)
8262 .unwrap_or_else(|| "anonymous".into());
8263 tracing::info!(
8264 target: "audit",
8265 principal = %principal,
8266 action = "iam:policy.drop",
8267 matched_policy_id = %id,
8268 "DROP POLICY applied"
8269 );
8270 self.inner.audit_log.record(
8271 "iam/policy.drop",
8272 &principal,
8273 id,
8274 "ok",
8275 crate::json::Value::Null,
8276 );
8277
8278 self.invalidate_result_cache();
8279 Ok(RuntimeQueryResult::ok_message(
8280 query.to_string(),
8281 &format!("policy `{id}` dropped"),
8282 "drop_iam_policy",
8283 ))
8284 }
8285
8286 fn execute_attach_policy(
8287 &self,
8288 query: &str,
8289 policy_id: &str,
8290 principal: &crate::storage::query::ast::PolicyPrincipalRef,
8291 ) -> RedDBResult<RuntimeQueryResult> {
8292 use crate::auth::store::PrincipalRef;
8293 use crate::auth::UserId;
8294 use crate::storage::query::ast::PolicyPrincipalRef;
8295
8296 let auth_store = self
8297 .inner
8298 .auth_store
8299 .read()
8300 .clone()
8301 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8302 let p = match principal {
8303 PolicyPrincipalRef::User(u) => {
8304 PrincipalRef::User(UserId::from_parts(u.tenant.as_deref(), &u.username))
8305 }
8306 PolicyPrincipalRef::Group(g) => PrincipalRef::Group(g.clone()),
8307 };
8308 let pretty_target = principal_label(principal);
8309 auth_store
8310 .attach_policy(p, policy_id)
8311 .map_err(|e| RedDBError::Query(e.to_string()))?;
8312
8313 let principal_str = current_auth_identity()
8314 .map(|(u, _)| u)
8315 .unwrap_or_else(|| "anonymous".into());
8316 tracing::info!(
8317 target: "audit",
8318 principal = %principal_str,
8319 action = "iam:policy.attach",
8320 matched_policy_id = %policy_id,
8321 target = %pretty_target,
8322 "ATTACH POLICY applied"
8323 );
8324 self.inner.audit_log.record(
8325 "iam/policy.attach",
8326 &principal_str,
8327 &pretty_target,
8328 "ok",
8329 crate::json::Value::Null,
8330 );
8331
8332 self.invalidate_result_cache();
8333 Ok(RuntimeQueryResult::ok_message(
8334 query.to_string(),
8335 &format!("policy `{policy_id}` attached to {pretty_target}"),
8336 "attach_policy",
8337 ))
8338 }
8339
8340 fn execute_detach_policy(
8341 &self,
8342 query: &str,
8343 policy_id: &str,
8344 principal: &crate::storage::query::ast::PolicyPrincipalRef,
8345 ) -> RedDBResult<RuntimeQueryResult> {
8346 use crate::auth::store::PrincipalRef;
8347 use crate::auth::UserId;
8348 use crate::storage::query::ast::PolicyPrincipalRef;
8349
8350 let auth_store = self
8351 .inner
8352 .auth_store
8353 .read()
8354 .clone()
8355 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8356 let p = match principal {
8357 PolicyPrincipalRef::User(u) => {
8358 PrincipalRef::User(UserId::from_parts(u.tenant.as_deref(), &u.username))
8359 }
8360 PolicyPrincipalRef::Group(g) => PrincipalRef::Group(g.clone()),
8361 };
8362 let pretty_target = principal_label(principal);
8363 auth_store
8364 .detach_policy(p, policy_id)
8365 .map_err(|e| RedDBError::Query(e.to_string()))?;
8366
8367 let principal_str = current_auth_identity()
8368 .map(|(u, _)| u)
8369 .unwrap_or_else(|| "anonymous".into());
8370 tracing::info!(
8371 target: "audit",
8372 principal = %principal_str,
8373 action = "iam:policy.detach",
8374 matched_policy_id = %policy_id,
8375 target = %pretty_target,
8376 "DETACH POLICY applied"
8377 );
8378 self.inner.audit_log.record(
8379 "iam/policy.detach",
8380 &principal_str,
8381 &pretty_target,
8382 "ok",
8383 crate::json::Value::Null,
8384 );
8385
8386 self.invalidate_result_cache();
8387 Ok(RuntimeQueryResult::ok_message(
8388 query.to_string(),
8389 &format!("policy `{policy_id}` detached from {pretty_target}"),
8390 "detach_policy",
8391 ))
8392 }
8393
8394 fn execute_show_policies(
8395 &self,
8396 query: &str,
8397 filter: Option<&crate::storage::query::ast::PolicyPrincipalRef>,
8398 ) -> RedDBResult<RuntimeQueryResult> {
8399 use crate::auth::UserId;
8400 use crate::storage::query::ast::PolicyPrincipalRef;
8401 use crate::storage::query::unified::UnifiedRecord;
8402 use crate::storage::schema::Value as SchemaValue;
8403 use std::sync::Arc;
8404
8405 let auth_store = self
8406 .inner
8407 .auth_store
8408 .read()
8409 .clone()
8410 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8411
8412 let pols = match filter {
8413 None => auth_store.list_policies(),
8414 Some(PolicyPrincipalRef::User(u)) => {
8415 let id = UserId::from_parts(u.tenant.as_deref(), &u.username);
8416 auth_store.effective_policies(&id)
8417 }
8418 Some(PolicyPrincipalRef::Group(g)) => auth_store.group_policies(g),
8419 };
8420
8421 let mut records = Vec::with_capacity(pols.len());
8422 for p in pols.iter() {
8423 let mut rec = UnifiedRecord::default();
8424 rec.set_arc(Arc::from("id"), SchemaValue::text(p.id.clone()));
8425 rec.set_arc(
8426 Arc::from("statements"),
8427 SchemaValue::Integer(p.statements.len() as i64),
8428 );
8429 rec.set_arc(
8430 Arc::from("tenant"),
8431 p.tenant
8432 .as_deref()
8433 .map(|t| SchemaValue::text(t.to_string()))
8434 .unwrap_or(SchemaValue::Null),
8435 );
8436 rec.set_arc(Arc::from("json"), SchemaValue::text(p.to_json_string()));
8437 records.push(rec);
8438 }
8439 let mut result = crate::storage::query::unified::UnifiedResult::empty();
8440 result.records = records;
8441 Ok(RuntimeQueryResult {
8442 query: query.to_string(),
8443 mode: crate::storage::query::modes::QueryMode::Sql,
8444 statement: "show_policies",
8445 engine: "iam-policies",
8446 result,
8447 affected_rows: 0,
8448 statement_type: "select",
8449 })
8450 }
8451
8452 fn execute_show_effective_permissions(
8453 &self,
8454 query: &str,
8455 user: &crate::storage::query::ast::PolicyUserRef,
8456 resource: Option<&crate::storage::query::ast::PolicyResourceRef>,
8457 ) -> RedDBResult<RuntimeQueryResult> {
8458 use crate::auth::UserId;
8459 use crate::storage::query::unified::UnifiedRecord;
8460 use crate::storage::schema::Value as SchemaValue;
8461 use std::sync::Arc;
8462
8463 let auth_store = self
8464 .inner
8465 .auth_store
8466 .read()
8467 .clone()
8468 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8469 let id = UserId::from_parts(user.tenant.as_deref(), &user.username);
8470 let pols = auth_store.effective_policies(&id);
8471
8472 let mut records = Vec::new();
8475 for p in pols.iter() {
8476 for (idx, st) in p.statements.iter().enumerate() {
8477 if let Some(_r) = resource {
8478 }
8482 let mut rec = UnifiedRecord::default();
8483 rec.set_arc(Arc::from("policy_id"), SchemaValue::text(p.id.clone()));
8484 rec.set_arc(
8485 Arc::from("statement_index"),
8486 SchemaValue::Integer(idx as i64),
8487 );
8488 rec.set_arc(
8489 Arc::from("sid"),
8490 st.sid
8491 .as_deref()
8492 .map(|s| SchemaValue::text(s.to_string()))
8493 .unwrap_or(SchemaValue::Null),
8494 );
8495 rec.set_arc(
8496 Arc::from("effect"),
8497 SchemaValue::text(match st.effect {
8498 crate::auth::policies::Effect::Allow => "allow",
8499 crate::auth::policies::Effect::Deny => "deny",
8500 }),
8501 );
8502 rec.set_arc(
8503 Arc::from("actions"),
8504 SchemaValue::Integer(st.actions.len() as i64),
8505 );
8506 rec.set_arc(
8507 Arc::from("resources"),
8508 SchemaValue::Integer(st.resources.len() as i64),
8509 );
8510 records.push(rec);
8511 }
8512 }
8513 let mut result = crate::storage::query::unified::UnifiedResult::empty();
8514 result.records = records;
8515 Ok(RuntimeQueryResult {
8516 query: query.to_string(),
8517 mode: crate::storage::query::modes::QueryMode::Sql,
8518 statement: "show_effective_permissions",
8519 engine: "iam-policies",
8520 result,
8521 affected_rows: 0,
8522 statement_type: "select",
8523 })
8524 }
8525
8526 fn execute_simulate_policy(
8527 &self,
8528 query: &str,
8529 user: &crate::storage::query::ast::PolicyUserRef,
8530 action: &str,
8531 resource: &crate::storage::query::ast::PolicyResourceRef,
8532 ) -> RedDBResult<RuntimeQueryResult> {
8533 use crate::auth::policies::ResourceRef;
8534 use crate::auth::store::SimCtx;
8535 use crate::auth::UserId;
8536 use crate::storage::query::unified::UnifiedRecord;
8537 use crate::storage::schema::Value as SchemaValue;
8538 use std::sync::Arc;
8539
8540 let auth_store = self
8541 .inner
8542 .auth_store
8543 .read()
8544 .clone()
8545 .ok_or_else(|| RedDBError::Query("auth store not configured".to_string()))?;
8546 let id = UserId::from_parts(user.tenant.as_deref(), &user.username);
8547 let r = ResourceRef::new(resource.kind.clone(), resource.name.clone());
8548 let outcome = auth_store.simulate(&id, action, &r, SimCtx::default());
8549
8550 let principal_str = current_auth_identity()
8551 .map(|(u, _)| u)
8552 .unwrap_or_else(|| "anonymous".into());
8553 let (decision_str, matched_pid, matched_sid) = decision_to_strings(&outcome.decision);
8554 tracing::info!(
8555 target: "audit",
8556 principal = %principal_str,
8557 action = "iam:policy.simulate",
8558 decision = %decision_str,
8559 matched_policy_id = ?matched_pid,
8560 matched_sid = ?matched_sid,
8561 "SIMULATE issued"
8562 );
8563 self.inner.audit_log.record(
8564 "iam/policy.simulate",
8565 &principal_str,
8566 &id.to_string(),
8567 "ok",
8568 crate::json::Value::Null,
8569 );
8570
8571 let mut rec = UnifiedRecord::default();
8572 rec.set_arc(Arc::from("decision"), SchemaValue::text(decision_str));
8573 rec.set_arc(
8574 Arc::from("matched_policy_id"),
8575 matched_pid
8576 .map(SchemaValue::text)
8577 .unwrap_or(SchemaValue::Null),
8578 );
8579 rec.set_arc(
8580 Arc::from("matched_sid"),
8581 matched_sid
8582 .map(SchemaValue::text)
8583 .unwrap_or(SchemaValue::Null),
8584 );
8585 rec.set_arc(Arc::from("reason"), SchemaValue::text(outcome.reason));
8586 rec.set_arc(
8587 Arc::from("trail_len"),
8588 SchemaValue::Integer(outcome.trail.len() as i64),
8589 );
8590 let mut result = crate::storage::query::unified::UnifiedResult::empty();
8591 result.records = vec![rec];
8592 Ok(RuntimeQueryResult {
8593 query: query.to_string(),
8594 mode: crate::storage::query::modes::QueryMode::Sql,
8595 statement: "simulate_policy",
8596 engine: "iam-policies",
8597 result,
8598 affected_rows: 0,
8599 statement_type: "select",
8600 })
8601 }
8602}
8603
8604fn grant_to_iam_policy(
8609 principal: &crate::auth::privileges::GrantPrincipal,
8610 resource: &crate::auth::privileges::Resource,
8611 actions: &[crate::auth::privileges::Action],
8612 tenant: Option<&str>,
8613) -> Option<crate::auth::policies::Policy> {
8614 use crate::auth::policies::{
8615 compile_action, ActionPattern, Effect, Policy, ResourcePattern, Statement,
8616 };
8617 use crate::auth::privileges::{Action, GrantPrincipal, Resource};
8618
8619 if matches!(principal, GrantPrincipal::Group(_)) {
8620 return None;
8621 }
8622
8623 let now = crate::auth::now_ms();
8624 let id = format!("_grant_{:x}_{:x}", now, std::process::id());
8625
8626 let resource_str = match resource {
8627 Resource::Database => "table:*".to_string(),
8628 Resource::Schema(s) => format!("table:{s}.*"),
8629 Resource::Table { schema, table } => match schema {
8630 Some(s) => format!("table:{s}.{table}"),
8631 None => format!("table:{table}"),
8632 },
8633 Resource::Function { schema, name } => match schema {
8634 Some(s) => format!("function:{s}.{name}"),
8635 None => format!("function:{name}"),
8636 },
8637 };
8638
8639 let action_patterns: Vec<ActionPattern> = if actions.contains(&Action::All) {
8643 vec![ActionPattern::Wildcard]
8644 } else {
8645 actions
8646 .iter()
8647 .map(|a| compile_action(&a.as_str().to_ascii_lowercase()))
8648 .collect()
8649 };
8650 if action_patterns.is_empty() {
8651 return None;
8652 }
8653
8654 let resource_patterns = if resource_str == "*" {
8659 vec![ResourcePattern::Wildcard]
8660 } else if resource_str.contains('*') {
8661 vec![ResourcePattern::Glob(resource_str.clone())]
8662 } else if let Some((kind, name)) = resource_str.split_once(':') {
8663 vec![ResourcePattern::Exact {
8664 kind: kind.to_string(),
8665 name: name.to_string(),
8666 }]
8667 } else {
8668 vec![ResourcePattern::Wildcard]
8669 };
8670
8671 let policy = Policy {
8672 id,
8673 version: 1,
8674 tenant: tenant.map(|t| t.to_string()),
8675 created_at: now,
8676 updated_at: now,
8677 statements: vec![Statement {
8678 sid: None,
8679 effect: Effect::Allow,
8680 actions: action_patterns,
8681 resources: resource_patterns,
8682 condition: None,
8683 }],
8684 };
8685 if policy.validate().is_err() {
8686 return None;
8687 }
8688 Some(policy)
8689}
8690
8691fn legacy_action_to_iam(action: crate::auth::privileges::Action) -> &'static str {
8692 use crate::auth::privileges::Action;
8693 match action {
8694 Action::Select => "select",
8695 Action::Insert => "insert",
8696 Action::Update => "update",
8697 Action::Delete => "delete",
8698 Action::Truncate => "truncate",
8699 Action::References => "references",
8700 Action::Execute => "execute",
8701 Action::Usage => "usage",
8702 Action::All => "*",
8703 }
8704}
8705
8706fn update_set_target_columns(query: &crate::storage::query::ast::UpdateQuery) -> Vec<String> {
8707 let mut columns = Vec::new();
8708 for (column, _) in &query.assignment_exprs {
8709 if !columns.iter().any(|seen| seen == column) {
8710 columns.push(column.clone());
8711 }
8712 }
8713 columns
8714}
8715
8716fn column_access_request_for_table_update(
8717 table_name: &str,
8718 columns: Vec<String>,
8719) -> crate::auth::ColumnAccessRequest {
8720 match table_name.split_once('.') {
8721 Some((schema, table)) => {
8722 crate::auth::ColumnAccessRequest::update(table.to_string(), columns)
8723 .with_schema(schema.to_string())
8724 }
8725 None => crate::auth::ColumnAccessRequest::update(table_name.to_string(), columns),
8726 }
8727}
8728
8729fn requested_table_columns_for_policy(
8730 table: &crate::storage::query::ast::TableQuery,
8731) -> Vec<String> {
8732 use crate::storage::query::sql_lowering::{
8733 effective_table_filter, effective_table_group_by_exprs, effective_table_having_filter,
8734 effective_table_projections,
8735 };
8736
8737 let table_name = table.table.as_str();
8738 let table_alias = table.alias.as_deref();
8739 let mut columns = std::collections::BTreeSet::new();
8740
8741 for projection in effective_table_projections(table) {
8742 collect_projection_columns(&projection, table_name, table_alias, &mut columns);
8743 }
8744 if let Some(filter) = effective_table_filter(table) {
8745 collect_filter_columns(&filter, table_name, table_alias, &mut columns);
8746 }
8747 for expr in effective_table_group_by_exprs(table) {
8748 collect_expr_columns(&expr, table_name, table_alias, &mut columns);
8749 }
8750 if let Some(filter) = effective_table_having_filter(table) {
8751 collect_filter_columns(&filter, table_name, table_alias, &mut columns);
8752 }
8753 for order in &table.order_by {
8754 if let Some(expr) = order.expr.as_ref() {
8755 collect_expr_columns(expr, table_name, table_alias, &mut columns);
8756 } else {
8757 collect_field_ref_column(&order.field, table_name, table_alias, &mut columns);
8758 }
8759 }
8760
8761 columns.into_iter().collect()
8762}
8763
8764fn collect_projection_columns(
8765 projection: &crate::storage::query::ast::Projection,
8766 table_name: &str,
8767 table_alias: Option<&str>,
8768 columns: &mut std::collections::BTreeSet<String>,
8769) {
8770 use crate::storage::query::ast::Projection;
8771 match projection {
8772 Projection::All => {
8773 columns.insert("*".to_string());
8774 }
8775 Projection::Column(column) | Projection::Alias(column, _) => {
8776 if column != "*" {
8777 columns.insert(column.clone());
8778 }
8779 }
8780 Projection::Function(_, args) => {
8781 for arg in args {
8782 collect_projection_columns(arg, table_name, table_alias, columns);
8783 }
8784 }
8785 Projection::Expression(filter, _) => {
8786 collect_filter_columns(filter, table_name, table_alias, columns);
8787 }
8788 Projection::Field(field, _) => {
8789 collect_field_ref_column(field, table_name, table_alias, columns);
8790 }
8791 }
8792}
8793
8794fn collect_filter_columns(
8795 filter: &crate::storage::query::ast::Filter,
8796 table_name: &str,
8797 table_alias: Option<&str>,
8798 columns: &mut std::collections::BTreeSet<String>,
8799) {
8800 use crate::storage::query::ast::Filter;
8801 match filter {
8802 Filter::Compare { field, .. }
8803 | Filter::IsNull(field)
8804 | Filter::IsNotNull(field)
8805 | Filter::In { field, .. }
8806 | Filter::Between { field, .. }
8807 | Filter::Like { field, .. }
8808 | Filter::StartsWith { field, .. }
8809 | Filter::EndsWith { field, .. }
8810 | Filter::Contains { field, .. } => {
8811 collect_field_ref_column(field, table_name, table_alias, columns);
8812 }
8813 Filter::CompareFields { left, right, .. } => {
8814 collect_field_ref_column(left, table_name, table_alias, columns);
8815 collect_field_ref_column(right, table_name, table_alias, columns);
8816 }
8817 Filter::CompareExpr { lhs, rhs, .. } => {
8818 collect_expr_columns(lhs, table_name, table_alias, columns);
8819 collect_expr_columns(rhs, table_name, table_alias, columns);
8820 }
8821 Filter::And(left, right) | Filter::Or(left, right) => {
8822 collect_filter_columns(left, table_name, table_alias, columns);
8823 collect_filter_columns(right, table_name, table_alias, columns);
8824 }
8825 Filter::Not(inner) => collect_filter_columns(inner, table_name, table_alias, columns),
8826 }
8827}
8828
8829fn collect_expr_columns(
8830 expr: &crate::storage::query::ast::Expr,
8831 table_name: &str,
8832 table_alias: Option<&str>,
8833 columns: &mut std::collections::BTreeSet<String>,
8834) {
8835 use crate::storage::query::ast::Expr;
8836 match expr {
8837 Expr::Column { field, .. } => {
8838 collect_field_ref_column(field, table_name, table_alias, columns);
8839 }
8840 Expr::Literal { .. } | Expr::Parameter { .. } => {}
8841 Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
8842 collect_expr_columns(operand, table_name, table_alias, columns);
8843 }
8844 Expr::BinaryOp { lhs, rhs, .. } => {
8845 collect_expr_columns(lhs, table_name, table_alias, columns);
8846 collect_expr_columns(rhs, table_name, table_alias, columns);
8847 }
8848 Expr::FunctionCall { args, .. } => {
8849 for arg in args {
8850 collect_expr_columns(arg, table_name, table_alias, columns);
8851 }
8852 }
8853 Expr::Case {
8854 branches, else_, ..
8855 } => {
8856 for (condition, value) in branches {
8857 collect_expr_columns(condition, table_name, table_alias, columns);
8858 collect_expr_columns(value, table_name, table_alias, columns);
8859 }
8860 if let Some(value) = else_ {
8861 collect_expr_columns(value, table_name, table_alias, columns);
8862 }
8863 }
8864 Expr::IsNull { operand, .. } => {
8865 collect_expr_columns(operand, table_name, table_alias, columns);
8866 }
8867 Expr::InList { target, values, .. } => {
8868 collect_expr_columns(target, table_name, table_alias, columns);
8869 for value in values {
8870 collect_expr_columns(value, table_name, table_alias, columns);
8871 }
8872 }
8873 Expr::Between {
8874 target, low, high, ..
8875 } => {
8876 collect_expr_columns(target, table_name, table_alias, columns);
8877 collect_expr_columns(low, table_name, table_alias, columns);
8878 collect_expr_columns(high, table_name, table_alias, columns);
8879 }
8880 }
8881}
8882
8883fn collect_field_ref_column(
8884 field: &crate::storage::query::ast::FieldRef,
8885 table_name: &str,
8886 table_alias: Option<&str>,
8887 columns: &mut std::collections::BTreeSet<String>,
8888) {
8889 if let Some(column) = policy_column_name_from_field_ref(field, table_name, table_alias) {
8890 if column != "*" {
8891 columns.insert(column);
8892 }
8893 }
8894}
8895
8896fn policy_column_name_from_field_ref(
8897 field: &crate::storage::query::ast::FieldRef,
8898 table_name: &str,
8899 table_alias: Option<&str>,
8900) -> Option<String> {
8901 match field {
8902 crate::storage::query::ast::FieldRef::TableColumn { table, column } => {
8903 if column == "*" {
8904 return Some("*".to_string());
8905 }
8906 if table.is_empty() || table == table_name || Some(table.as_str()) == table_alias {
8907 Some(column.clone())
8908 } else {
8909 Some(format!("{table}.{column}"))
8910 }
8911 }
8912 _ => None,
8913 }
8914}
8915
8916fn legacy_resource_to_iam(
8917 resource: &crate::auth::privileges::Resource,
8918 tenant: Option<&str>,
8919) -> crate::auth::policies::ResourceRef {
8920 use crate::auth::privileges::Resource;
8921
8922 let (kind, name) = match resource {
8923 Resource::Database => ("database".to_string(), "*".to_string()),
8924 Resource::Schema(s) => ("schema".to_string(), format!("{s}.*")),
8925 Resource::Table { schema, table } => (
8926 "table".to_string(),
8927 match schema {
8928 Some(s) => format!("{s}.{table}"),
8929 None => table.clone(),
8930 },
8931 ),
8932 Resource::Function { schema, name } => (
8933 "function".to_string(),
8934 match schema {
8935 Some(s) => format!("{s}.{name}"),
8936 None => name.clone(),
8937 },
8938 ),
8939 };
8940
8941 let mut out = crate::auth::policies::ResourceRef::new(kind, name);
8942 if let Some(t) = tenant {
8943 out = out.with_tenant(t.to_string());
8944 }
8945 out
8946}
8947
8948#[derive(Debug)]
8949struct JoinTableSide {
8950 table: String,
8951 alias: String,
8952}
8953
8954fn table_side_context(expr: &QueryExpr) -> Option<JoinTableSide> {
8955 match expr {
8956 QueryExpr::Table(table) => Some(JoinTableSide {
8957 table: table.table.clone(),
8958 alias: table.alias.clone().unwrap_or_else(|| table.table.clone()),
8959 }),
8960 _ => None,
8961 }
8962}
8963
8964fn collect_projection_columns_for_table(
8965 projection: &Projection,
8966 table: &str,
8967 alias: Option<&str>,
8968 out: &mut BTreeSet<String>,
8969) {
8970 match projection {
8971 Projection::Column(column) | Projection::Alias(column, _) => {
8972 match split_qualified_column(column) {
8973 Some((qualifier, column))
8974 if qualifier == table || alias.is_some_and(|alias| qualifier == alias) =>
8975 {
8976 push_policy_column(column, out);
8977 }
8978 Some(_) => {}
8979 None => push_policy_column(column, out),
8980 }
8981 }
8982 Projection::Field(
8983 FieldRef::TableColumn {
8984 table: qualifier,
8985 column,
8986 },
8987 _,
8988 ) => {
8989 if qualifier.is_empty()
8990 || qualifier == table
8991 || alias.is_some_and(|alias| qualifier == alias)
8992 {
8993 push_policy_column(column, out);
8994 }
8995 }
8996 Projection::Field(
8997 FieldRef::NodeProperty {
8998 alias: qualifier,
8999 property,
9000 },
9001 _,
9002 )
9003 | Projection::Field(
9004 FieldRef::EdgeProperty {
9005 alias: qualifier,
9006 property,
9007 },
9008 _,
9009 ) => {
9010 if qualifier == table || alias.is_some_and(|alias| qualifier == alias) {
9011 push_policy_column(property, out);
9012 }
9013 }
9014 Projection::Function(_, args) => {
9015 for arg in args {
9016 collect_projection_columns_for_table(arg, table, alias, out);
9017 }
9018 }
9019 Projection::Expression(_, _) | Projection::All | Projection::Field(_, _) => {}
9020 }
9021}
9022
9023fn collect_projection_columns_for_join_side(
9024 projection: &Projection,
9025 left: Option<&JoinTableSide>,
9026 right: Option<&JoinTableSide>,
9027 out: &mut HashMap<String, BTreeSet<String>>,
9028) -> RedDBResult<()> {
9029 match projection {
9030 Projection::Column(column) | Projection::Alias(column, _) => {
9031 if let Some((qualifier, column)) = split_qualified_column(column) {
9032 push_qualified_join_column(qualifier, column, left, right, out);
9033 } else {
9034 push_unqualified_join_column(column, left, right, out);
9035 }
9036 }
9037 Projection::Field(FieldRef::TableColumn { table, column }, _) => {
9038 if table.is_empty() {
9039 push_unqualified_join_column(column, left, right, out);
9040 } else if let Some(side) = [left, right]
9041 .into_iter()
9042 .flatten()
9043 .find(|side| table == side.table.as_str() || table == side.alias.as_str())
9044 {
9045 push_join_column(&side.table, column, out);
9046 }
9047 }
9048 Projection::Field(FieldRef::NodeProperty { alias, property }, _)
9049 | Projection::Field(FieldRef::EdgeProperty { alias, property }, _) => {
9050 push_qualified_join_column(alias, property, left, right, out);
9051 }
9052 Projection::Function(_, args) => {
9053 for arg in args {
9054 collect_projection_columns_for_join_side(arg, left, right, out)?;
9055 }
9056 }
9057 Projection::Expression(_, _) | Projection::All | Projection::Field(_, _) => {}
9058 }
9059 Ok(())
9060}
9061
9062fn split_qualified_column(column: &str) -> Option<(&str, &str)> {
9063 let (qualifier, column) = column.split_once('.')?;
9064 if qualifier.is_empty() || column.is_empty() || column.contains('.') {
9065 return None;
9066 }
9067 Some((qualifier, column))
9068}
9069
9070fn push_qualified_join_column(
9071 qualifier: &str,
9072 column: &str,
9073 left: Option<&JoinTableSide>,
9074 right: Option<&JoinTableSide>,
9075 out: &mut HashMap<String, BTreeSet<String>>,
9076) {
9077 if let Some(side) = [left, right]
9078 .into_iter()
9079 .flatten()
9080 .find(|side| qualifier == side.table.as_str() || qualifier == side.alias.as_str())
9081 {
9082 push_join_column(&side.table, column, out);
9083 }
9084}
9085
9086fn push_unqualified_join_column(
9087 column: &str,
9088 left: Option<&JoinTableSide>,
9089 right: Option<&JoinTableSide>,
9090 out: &mut HashMap<String, BTreeSet<String>>,
9091) {
9092 for side in [left, right].into_iter().flatten() {
9093 push_join_column(&side.table, column, out);
9094 }
9095}
9096
9097fn push_join_column(table: &str, column: &str, out: &mut HashMap<String, BTreeSet<String>>) {
9098 if is_policy_column_name(column) {
9099 out.entry(table.to_string())
9100 .or_default()
9101 .insert(column.to_string());
9102 }
9103}
9104
9105fn push_policy_column(column: &str, out: &mut BTreeSet<String>) {
9106 if is_policy_column_name(column) {
9107 out.insert(column.to_string());
9108 }
9109}
9110
9111fn is_policy_column_name(column: &str) -> bool {
9112 !column.is_empty()
9113 && column != "*"
9114 && !column.starts_with("LIT:")
9115 && !column.starts_with("TYPE:")
9116}
9117
9118fn runtime_iam_context(
9119 role: crate::auth::Role,
9120 tenant: Option<&str>,
9121) -> crate::auth::policies::EvalContext {
9122 crate::auth::policies::EvalContext {
9123 principal_tenant: tenant.map(|t| t.to_string()),
9124 current_tenant: tenant.map(|t| t.to_string()),
9125 peer_ip: None,
9126 mfa_present: false,
9127 now_ms: crate::auth::now_ms(),
9128 principal_is_admin_role: role == crate::auth::Role::Admin,
9129 }
9130}
9131
9132fn explicit_table_projection_columns(
9133 query: &crate::storage::query::ast::TableQuery,
9134) -> Vec<String> {
9135 use crate::storage::query::ast::{FieldRef, Projection};
9136
9137 let mut columns = Vec::new();
9138 for projection in crate::storage::query::sql_lowering::effective_table_projections(query) {
9139 match projection {
9140 Projection::Column(column) | Projection::Alias(column, _) => {
9141 push_unique(&mut columns, column)
9142 }
9143 Projection::Field(FieldRef::TableColumn { column, .. }, _) => {
9144 push_unique(&mut columns, column)
9145 }
9146 _ => {}
9150 }
9151 }
9152 columns
9153}
9154
9155fn explicit_graph_projection_properties(
9156 query: &crate::storage::query::ast::GraphQuery,
9157) -> Vec<String> {
9158 use crate::storage::query::ast::{FieldRef, Projection};
9159
9160 let mut columns = Vec::new();
9161 for projection in &query.return_ {
9162 match projection {
9163 Projection::Field(FieldRef::NodeProperty { property, .. }, _)
9164 | Projection::Field(FieldRef::EdgeProperty { property, .. }, _) => {
9165 push_unique(&mut columns, property.clone())
9166 }
9167 _ => {}
9168 }
9169 }
9170 columns
9171}
9172
9173fn push_unique(columns: &mut Vec<String>, column: String) {
9174 if !columns.iter().any(|existing| existing == &column) {
9175 columns.push(column);
9176 }
9177}
9178
9179fn principal_label(p: &crate::storage::query::ast::PolicyPrincipalRef) -> String {
9180 use crate::storage::query::ast::PolicyPrincipalRef;
9181 match p {
9182 PolicyPrincipalRef::User(u) => match &u.tenant {
9183 Some(t) => format!("user:{t}/{}", u.username),
9184 None => format!("user:{}", u.username),
9185 },
9186 PolicyPrincipalRef::Group(g) => format!("group:{g}"),
9187 }
9188}
9189
9190pub(crate) fn decision_to_strings(
9193 d: &crate::auth::policies::Decision,
9194) -> (String, Option<String>, Option<String>) {
9195 use crate::auth::policies::Decision;
9196 match d {
9197 Decision::Allow {
9198 matched_policy_id,
9199 matched_sid,
9200 } => (
9201 "allow".into(),
9202 Some(matched_policy_id.clone()),
9203 matched_sid.clone(),
9204 ),
9205 Decision::Deny {
9206 matched_policy_id,
9207 matched_sid,
9208 } => (
9209 "deny".into(),
9210 Some(matched_policy_id.clone()),
9211 matched_sid.clone(),
9212 ),
9213 Decision::DefaultDeny => ("default_deny".into(), None, None),
9214 Decision::AdminBypass => ("admin_bypass".into(), None, None),
9215 }
9216}
9217
9218fn parse_timestamp_to_ms(s: &str) -> Option<u128> {
9219 if let Ok(n) = s.parse::<u128>() {
9221 return Some(n);
9222 }
9223 if let Some(date) = s.split_whitespace().next() {
9227 let parts: Vec<&str> = date.split('-').collect();
9228 if parts.len() == 3 {
9229 let (y, m, d) = (parts[0], parts[1], parts[2]);
9230 if let (Ok(y), Ok(m), Ok(d)) = (y.parse::<i64>(), m.parse::<u32>(), d.parse::<u32>()) {
9231 let days_in = days_from_civil(y, m, d);
9235 return Some((days_in as u128) * 86_400_000u128);
9236 }
9237 }
9238 }
9239 None
9240}
9241
9242fn days_from_civil(y: i64, m: u32, d: u32) -> i64 {
9245 let y = if m <= 2 { y - 1 } else { y };
9246 let era = if y >= 0 { y } else { y - 399 } / 400;
9247 let yoe = (y - era * 400) as u64; let doy = (153 * (if m > 2 { m - 3 } else { m + 9 }) as u64 + 2) / 5 + d as u64 - 1;
9249 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
9250 era * 146097 + doe as i64 - 719468
9251}
9252
9253fn walk_plan_node(
9254 node: &crate::storage::query::planner::CanonicalLogicalNode,
9255 depth: usize,
9256 out: &mut Vec<crate::storage::query::unified::UnifiedRecord>,
9257) {
9258 use std::sync::Arc;
9259 let mut rec = crate::storage::query::unified::UnifiedRecord::default();
9260 rec.set_arc(Arc::from("op"), Value::text(node.operator.clone()));
9261 rec.set_arc(
9262 Arc::from("source"),
9263 node.source.clone().map(Value::text).unwrap_or(Value::Null),
9264 );
9265 rec.set_arc(Arc::from("est_rows"), Value::Float(node.estimated_rows));
9266 rec.set_arc(Arc::from("est_cost"), Value::Float(node.operator_cost));
9267 rec.set_arc(Arc::from("depth"), Value::Integer(depth as i64));
9268 out.push(rec);
9269 for child in &node.children {
9270 walk_plan_node(child, depth + 1, out);
9271 }
9272}