1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(&query.name, query.collection_model)
265 };
266 self.inner
267 .db
268 .save_collection_contract(contract)
269 .map_err(|err| RedDBError::Internal(err.to_string()))?;
270 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271 store.set_config_tree(
272 &format!("red.collection_tenants.{}", query.name),
273 &crate::serde_json::Value::String(tenant_id),
274 );
275 }
276 self.inner
277 .db
278 .persist_metadata()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281
282 Ok(RuntimeQueryResult::ok_message(
283 raw_query.to_string(),
284 &format!("{label} '{}' created", query.name),
285 "create",
286 ))
287 }
288
289 pub fn execute_create_collection(
290 &self,
291 raw_query: &str,
292 query: &CreateCollectionQuery,
293 ) -> RedDBResult<RuntimeQueryResult> {
294 let model = match query.kind.as_str() {
295 "graph" => CollectionModel::Graph,
296 "document" => CollectionModel::Document,
297 "metrics" => CollectionModel::Metrics,
298 "blockchain" => CollectionModel::Table,
303 other => {
304 return Err(RedDBError::Query(format!(
305 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
306 )));
307 }
308 };
309 let create = CreateTableQuery {
310 collection_model: model,
311 name: query.name.clone(),
312 columns: Vec::new(),
313 if_not_exists: query.if_not_exists,
314 default_ttl_ms: None,
315 metrics_rollup_policies: Vec::new(),
316 context_index_fields: Vec::new(),
317 context_index_enabled: false,
318 timestamps: false,
319 partition_by: None,
320 tenant_by: None,
321 append_only: false,
322 subscriptions: Vec::new(),
323 vault_own_master_key: false,
324 };
325 let result = self.execute_create_table(raw_query, &create)?;
326 if query.kind == "blockchain" {
327 self.install_blockchain_kind(&query.name)?;
328 }
329 if !query.allowed_signers.is_empty() {
334 let actor = crate::runtime::impl_core::current_user_projected()
335 .unwrap_or_else(|| "@system/create-collection".to_string());
336 crate::runtime::signed_writes_kind::install(
337 &*self.inner.db.store(),
338 &query.name,
339 &query.allowed_signers,
340 &actor,
341 );
342 }
343 Ok(result)
344 }
345
346 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
350 use crate::runtime::blockchain_kind;
351 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
352 use std::sync::Arc;
353
354 let store = self.inner.db.store();
355 blockchain_kind::mark_as_chain(&*store, name);
356
357 let existing_tip = blockchain_kind::chain_tip(&*store, name);
358 if existing_tip.height.is_some() {
359 return Ok(());
360 }
361
362 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
363 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
364 fields.into_iter().collect();
365 let entity = UnifiedEntity::new(
366 EntityId::new(0),
367 EntityKind::TableRow {
368 table: Arc::from(name),
369 row_id: 0,
370 },
371 EntityData::Row(RowData {
372 columns: Vec::new(),
373 named: Some(named),
374 schema: None,
375 }),
376 );
377 store
378 .insert_auto(name, entity)
379 .map_err(|err| RedDBError::Internal(err.to_string()))?;
380 if let Some(tip) = blockchain_kind::chain_tip_full(&*store, name) {
383 self.inner
384 .chain_tip_cache
385 .lock()
386 .insert(name.to_string(), tip);
387 }
388 Ok(())
389 }
390
391 pub fn execute_create_vector(
392 &self,
393 raw_query: &str,
394 query: &CreateVectorQuery,
395 ) -> RedDBResult<RuntimeQueryResult> {
396 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
397 if is_system_schema_name(&query.name) {
398 return Err(RedDBError::Query("system schema is read-only".to_string()));
399 }
400 let store = self.inner.db.store();
401 if store.get_collection(&query.name).is_some() {
402 if query.if_not_exists {
403 return Ok(RuntimeQueryResult::ok_message(
404 raw_query.to_string(),
405 &format!("vector '{}' already exists", query.name),
406 "create",
407 ));
408 }
409 return Err(RedDBError::Query(format!(
410 "vector '{}' already exists",
411 query.name
412 )));
413 }
414
415 store
416 .create_collection(&query.name)
417 .map_err(|err| RedDBError::Internal(err.to_string()))?;
418 self.inner
419 .db
420 .save_collection_contract(vector_collection_contract(query))
421 .map_err(|err| RedDBError::Internal(err.to_string()))?;
422 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
423 store.set_config_tree(
424 &format!("red.collection_tenants.{}", query.name),
425 &crate::serde_json::Value::String(tenant_id),
426 );
427 }
428 self.inner
429 .db
430 .persist_metadata()
431 .map_err(|err| RedDBError::Internal(err.to_string()))?;
432 self.invalidate_result_cache();
433
434 Ok(RuntimeQueryResult::ok_message(
435 raw_query.to_string(),
436 &format!("vector '{}' created", query.name),
437 "create",
438 ))
439 }
440
441 fn provision_vault_key_material(
442 &self,
443 collection: &str,
444 own_master_key: bool,
445 ) -> RedDBResult<()> {
446 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
447 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
448 })?;
449 if !auth_store.is_vault_backed() {
450 return Err(RedDBError::Query(
451 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
452 ));
453 }
454
455 if auth_store.vault_secret_key().is_none() {
456 let key = crate::auth::store::random_bytes(32);
457 auth_store
458 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
459 .map_err(|err| RedDBError::Query(err.to_string()))?;
460 }
461
462 if own_master_key {
463 let key = crate::auth::store::random_bytes(32);
464 auth_store
465 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
466 .map_err(|err| RedDBError::Query(err.to_string()))?;
467 }
468
469 Ok(())
470 }
471
472 pub fn execute_drop_table(
476 &self,
477 raw_query: &str,
478 query: &DropTableQuery,
479 ) -> RedDBResult<RuntimeQueryResult> {
480 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
481 let store = self.inner.db.store();
482
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486
487 let exists = store.get_collection(&query.name).is_some();
488 if !exists {
489 if query.if_exists {
490 return Ok(RuntimeQueryResult::ok_message(
491 raw_query.to_string(),
492 &format!("table '{}' does not exist", query.name),
493 "drop",
494 ));
495 }
496 return Err(RedDBError::NotFound(format!(
497 "table '{}' not found",
498 query.name
499 )));
500 }
501 let actual =
502 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
503 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
504
505 let final_count = store
508 .get_collection(&query.name)
509 .map(|manager| manager.query_all(|_| true).len() as u64)
510 .unwrap_or(0);
511 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
512 self,
513 &query.name,
514 final_count,
515 )?;
516
517 let orphaned_indices: Vec<String> = self
518 .inner
519 .index_store
520 .list_indices(&query.name)
521 .into_iter()
522 .map(|index| index.name)
523 .collect();
524 for name in &orphaned_indices {
525 self.inner.index_store.drop_index(name, &query.name);
526 }
527
528 store
529 .drop_collection(&query.name)
530 .map_err(|err| RedDBError::Internal(err.to_string()))?;
531 self.inner.db.invalidate_vector_index(&query.name);
532 self.inner.db.clear_collection_default_ttl_ms(&query.name);
533 self.inner
534 .db
535 .remove_collection_contract(&query.name)
536 .map_err(|err| RedDBError::Internal(err.to_string()))?;
537 self.clear_table_planner_stats(&query.name);
538 self.invalidate_result_cache();
539 if let Some(store) = self.inner.auth_store.read().clone() {
543 store.invalidate_visible_collections_cache();
544 }
545 self.inner
546 .db
547 .persist_metadata()
548 .map_err(|err| RedDBError::Internal(err.to_string()))?;
549 self.schema_vocabulary_apply(
555 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
556 collection: query.name.clone(),
557 },
558 );
559
560 Ok(RuntimeQueryResult::ok_message(
561 raw_query.to_string(),
562 &format!("table '{}' dropped", query.name),
563 "drop",
564 ))
565 }
566
567 pub fn execute_drop_graph(
568 &self,
569 raw_query: &str,
570 query: &DropGraphQuery,
571 ) -> RedDBResult<RuntimeQueryResult> {
572 self.execute_drop_typed_collection(
573 raw_query,
574 &query.name,
575 query.if_exists,
576 CollectionModel::Graph,
577 "graph",
578 )
579 }
580
581 pub fn execute_drop_vector(
582 &self,
583 raw_query: &str,
584 query: &DropVectorQuery,
585 ) -> RedDBResult<RuntimeQueryResult> {
586 self.execute_drop_typed_collection(
587 raw_query,
588 &query.name,
589 query.if_exists,
590 CollectionModel::Vector,
591 "vector",
592 )
593 }
594
595 pub fn execute_drop_document(
596 &self,
597 raw_query: &str,
598 query: &DropDocumentQuery,
599 ) -> RedDBResult<RuntimeQueryResult> {
600 self.execute_drop_typed_collection(
601 raw_query,
602 &query.name,
603 query.if_exists,
604 CollectionModel::Document,
605 "document",
606 )
607 }
608
609 pub fn execute_drop_kv(
610 &self,
611 raw_query: &str,
612 query: &DropKvQuery,
613 ) -> RedDBResult<RuntimeQueryResult> {
614 let label = polymorphic_resolver::model_name(query.model);
615 self.execute_drop_typed_collection(
616 raw_query,
617 &query.name,
618 query.if_exists,
619 query.model,
620 label,
621 )
622 }
623
624 pub fn execute_drop_collection(
625 &self,
626 raw_query: &str,
627 query: &DropCollectionQuery,
628 ) -> RedDBResult<RuntimeQueryResult> {
629 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
630 if is_system_schema_name(&query.name) {
631 return Err(RedDBError::Query("system schema is read-only".to_string()));
632 }
633 let store = self.inner.db.store();
634 if store.get_collection(&query.name).is_none() {
635 if query.if_exists {
636 return Ok(RuntimeQueryResult::ok_message(
637 raw_query.to_string(),
638 &format!("collection '{}' does not exist", query.name),
639 "drop",
640 ));
641 }
642 return Err(RedDBError::NotFound(format!(
643 "collection '{}' not found",
644 query.name
645 )));
646 }
647
648 let actual =
649 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
650 if let Some(expected) = query.model {
651 polymorphic_resolver::ensure_model_match(expected, actual)?;
652 }
653
654 match actual {
655 CollectionModel::Table => self.execute_drop_table(
656 raw_query,
657 &DropTableQuery {
658 name: query.name.clone(),
659 if_exists: query.if_exists,
660 },
661 ),
662 CollectionModel::TimeSeries => self.execute_drop_timeseries(
663 raw_query,
664 &DropTimeSeriesQuery {
665 name: query.name.clone(),
666 if_exists: query.if_exists,
667 },
668 ),
669 CollectionModel::Queue => self.execute_drop_queue(
670 raw_query,
671 &DropQueueQuery {
672 name: query.name.clone(),
673 if_exists: query.if_exists,
674 },
675 ),
676 CollectionModel::Graph => self.execute_drop_graph(
677 raw_query,
678 &DropGraphQuery {
679 name: query.name.clone(),
680 if_exists: query.if_exists,
681 },
682 ),
683 CollectionModel::Vector => self.execute_drop_vector(
684 raw_query,
685 &DropVectorQuery {
686 name: query.name.clone(),
687 if_exists: query.if_exists,
688 },
689 ),
690 CollectionModel::Document => self.execute_drop_document(
691 raw_query,
692 &DropDocumentQuery {
693 name: query.name.clone(),
694 if_exists: query.if_exists,
695 },
696 ),
697 CollectionModel::Kv => self.execute_drop_kv(
698 raw_query,
699 &DropKvQuery {
700 name: query.name.clone(),
701 if_exists: query.if_exists,
702 model: CollectionModel::Kv,
703 },
704 ),
705 CollectionModel::Config => self.execute_drop_kv(
706 raw_query,
707 &DropKvQuery {
708 name: query.name.clone(),
709 if_exists: query.if_exists,
710 model: CollectionModel::Config,
711 },
712 ),
713 CollectionModel::Vault => self.execute_drop_kv(
714 raw_query,
715 &DropKvQuery {
716 name: query.name.clone(),
717 if_exists: query.if_exists,
718 model: CollectionModel::Vault,
719 },
720 ),
721 CollectionModel::Hll => self.execute_probabilistic_command(
722 raw_query,
723 &ProbabilisticCommand::DropHll {
724 name: query.name.clone(),
725 if_exists: query.if_exists,
726 },
727 ),
728 CollectionModel::Sketch => self.execute_probabilistic_command(
729 raw_query,
730 &ProbabilisticCommand::DropSketch {
731 name: query.name.clone(),
732 if_exists: query.if_exists,
733 },
734 ),
735 CollectionModel::Filter => self.execute_probabilistic_command(
736 raw_query,
737 &ProbabilisticCommand::DropFilter {
738 name: query.name.clone(),
739 if_exists: query.if_exists,
740 },
741 ),
742 CollectionModel::Metrics => self.execute_drop_typed_collection(
743 raw_query,
744 &query.name,
745 query.if_exists,
746 CollectionModel::Metrics,
747 "metrics",
748 ),
749 CollectionModel::Mixed => self.execute_drop_typed_collection(
750 raw_query,
751 &query.name,
752 query.if_exists,
753 CollectionModel::Mixed,
754 "collection",
755 ),
756 }
757 }
758
759 pub fn execute_alter_table(
765 &self,
766 raw_query: &str,
767 query: &AlterTableQuery,
768 ) -> RedDBResult<RuntimeQueryResult> {
769 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
770 let store = self.inner.db.store();
771
772 if store.get_collection(&query.name).is_none() {
774 return Err(RedDBError::NotFound(format!(
775 "table '{}' not found",
776 query.name
777 )));
778 }
779
780 let mut messages = Vec::new();
781
782 let fields_added: Vec<String> = query
784 .operations
785 .iter()
786 .filter_map(|op| {
787 if let AlterOperation::AddColumn(col) = op {
788 Some(col.name.clone())
789 } else {
790 None
791 }
792 })
793 .collect();
794 let fields_removed: Vec<String> = query
795 .operations
796 .iter()
797 .filter_map(|op| {
798 if let AlterOperation::DropColumn(name) = op {
799 Some(name.clone())
800 } else {
801 None
802 }
803 })
804 .collect();
805
806 for op in &query.operations {
807 match op {
808 AlterOperation::AddColumn(col) => {
809 messages.push(format!("column '{}' added", col.name));
811 }
812 AlterOperation::DropColumn(name) => {
813 messages.push(format!("column '{}' dropped", name));
814 }
815 AlterOperation::RenameColumn { from, to } => {
816 messages.push(format!("column '{}' renamed to '{}'", from, to));
817 }
818 AlterOperation::AttachPartition { child, bound } => {
819 store.set_config_tree(
823 &format!("partition.{}.children.{}", query.name, child),
824 &crate::serde_json::Value::String(bound.clone()),
825 );
826 messages.push(format!(
827 "partition '{child}' attached to '{}' ({bound})",
828 query.name
829 ));
830 }
831 AlterOperation::DetachPartition { child } => {
832 store.set_config_tree(
833 &format!("partition.{}.children.{}", query.name, child),
834 &crate::serde_json::Value::Null,
835 );
836 messages.push(format!(
837 "partition '{child}' detached from '{}'",
838 query.name
839 ));
840 }
841 AlterOperation::EnableRowLevelSecurity => {
842 self.inner
843 .rls_enabled_tables
844 .write()
845 .insert(query.name.clone());
846 store.set_config_tree(
848 &format!("rls.enabled.{}", query.name),
849 &crate::serde_json::Value::Bool(true),
850 );
851 self.invalidate_plan_cache();
852 messages.push(format!("row level security enabled on '{}'", query.name));
853 }
854 AlterOperation::DisableRowLevelSecurity => {
855 self.inner.rls_enabled_tables.write().remove(&query.name);
856 store.set_config_tree(
857 &format!("rls.enabled.{}", query.name),
858 &crate::serde_json::Value::Null,
859 );
860 self.invalidate_plan_cache();
861 messages.push(format!("row level security disabled on '{}'", query.name));
862 }
863 AlterOperation::EnableTenancy { column } => {
865 store.set_config_tree(
866 &format!("tenant_tables.{}.column", query.name),
867 &crate::serde_json::Value::String(column.clone()),
868 );
869 self.register_tenant_table(&query.name, column);
870 self.invalidate_plan_cache();
871 messages.push(format!(
872 "tenancy enabled on '{}' by column '{column}'",
873 query.name
874 ));
875 }
876 AlterOperation::DisableTenancy => {
877 store.set_config_tree(
878 &format!("tenant_tables.{}.column", query.name),
879 &crate::serde_json::Value::Null,
880 );
881 self.unregister_tenant_table(&query.name);
882 self.invalidate_plan_cache();
883 messages.push(format!("tenancy disabled on '{}'", query.name));
884 }
885 AlterOperation::SetAppendOnly(on) => {
886 messages.push(format!(
891 "append_only {} on '{}'",
892 if *on { "enabled" } else { "disabled" },
893 query.name
894 ));
895 }
896 AlterOperation::SetVersioned(on) => {
897 self.vcs_set_versioned(&query.name, *on)?;
904 messages.push(format!(
905 "versioned {} on '{}'",
906 if *on { "enabled" } else { "disabled" },
907 query.name
908 ));
909 }
910 AlterOperation::EnableEvents(subscription) => {
911 let mut subscription = subscription.clone();
912 subscription.source = query.name.clone();
913 validate_event_subscriptions(
914 self,
915 &query.name,
916 std::slice::from_ref(&subscription),
917 )?;
918 ensure_event_target_queue(self, &subscription.target_queue)?;
919 messages.push(format!(
920 "events enabled on '{}' to '{}'",
921 query.name, subscription.target_queue
922 ));
923 }
924 AlterOperation::DisableEvents => {
925 messages.push(format!("events disabled on '{}'", query.name));
926 }
927 AlterOperation::AddSubscription { name, descriptor } => {
928 let mut sub = descriptor.clone();
929 sub.name = name.clone();
930 sub.source = query.name.clone();
931 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
932 ensure_event_target_queue(self, &sub.target_queue)?;
933 messages.push(format!(
934 "subscription '{}' added on '{}' to '{}'",
935 name, query.name, sub.target_queue
936 ));
937 }
938 AlterOperation::DropSubscription { name } => {
939 messages.push(format!(
940 "subscription '{}' dropped on '{}'",
941 name, query.name
942 ));
943 }
944 AlterOperation::AddSigner { pubkey } => {
945 if !crate::runtime::signed_writes_kind::is_signed(&*store, &query.name)
952 {
953 return Err(RedDBError::Query(format!(
954 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
955 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
956 query.name
957 )));
958 }
959 let actor = crate::runtime::impl_core::current_user_projected()
960 .unwrap_or_else(|| "@system/alter".to_string());
961 let changed = crate::runtime::signed_writes_kind::add_signer(
962 &*store, &query.name, *pubkey, &actor,
963 );
964 messages.push(format!(
965 "signer {} on '{}'",
966 if changed { "added" } else { "already present" },
967 query.name
968 ));
969 }
970 AlterOperation::RevokeSigner { pubkey } => {
971 if !crate::runtime::signed_writes_kind::is_signed(&*store, &query.name)
972 {
973 return Err(RedDBError::Query(format!(
974 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
975 query.name
976 )));
977 }
978 let actor = crate::runtime::impl_core::current_user_projected()
979 .unwrap_or_else(|| "@system/alter".to_string());
980 let changed = crate::runtime::signed_writes_kind::revoke_signer(
981 &*store, &query.name, pubkey, &actor,
982 );
983 messages.push(format!(
984 "signer {} on '{}'",
985 if changed { "revoked" } else { "already revoked" },
986 query.name
987 ));
988 }
989 AlterOperation::SetRetention { duration_ms } => {
990 let existing = self.inner.db.collection_contract(&query.name);
996 let has_ts_column = existing
997 .as_ref()
998 .map(retention_timestamp_column_exists)
999 .unwrap_or(false);
1000 if !has_ts_column {
1001 return Err(RedDBError::Query(format!(
1002 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1003 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1004 or enable WITH timestamps = true before setting a \
1005 retention policy",
1006 query.name
1007 )));
1008 }
1009 messages.push(format!(
1010 "retention set to {duration_ms} ms on '{}'",
1011 query.name
1012 ));
1013 }
1014 AlterOperation::UnsetRetention => {
1015 messages.push(format!("retention cleared on '{}'", query.name));
1016 }
1017 }
1018 }
1019
1020 let mut contract = self
1021 .inner
1022 .db
1023 .collection_contract(&query.name)
1024 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1025 apply_alter_operations_to_contract(&mut contract, &query.operations);
1026 contract.version = contract.version.saturating_add(1);
1027 contract.updated_at_unix_ms = current_unix_ms();
1028 self.inner
1029 .db
1030 .save_collection_contract(contract)
1031 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1032 if !fields_added.is_empty() || !fields_removed.is_empty() {
1036 let sub_names: Vec<String> = self
1037 .inner
1038 .db
1039 .collection_contract(&query.name)
1040 .map(|c| {
1041 c.subscriptions
1042 .iter()
1043 .filter(|s| s.enabled)
1044 .map(|s| s.name.clone())
1045 .collect()
1046 })
1047 .unwrap_or_default();
1048 if !sub_names.is_empty() {
1049 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1050 collection: query.name.clone(),
1051 subscription_names: sub_names.join(", "),
1052 fields_added: fields_added.join(", "),
1053 fields_removed: fields_removed.join(", "),
1054 lsn: self.cdc_current_lsn(),
1055 }
1056 .emit_global();
1057 }
1058 }
1059
1060 self.clear_table_planner_stats(&query.name);
1061 self.invalidate_result_cache();
1062 let post_alter_columns: Vec<String> = self
1067 .inner
1068 .db
1069 .collection_contract(&query.name)
1070 .map(|contract| {
1071 contract
1072 .declared_columns
1073 .iter()
1074 .map(|col| col.name.clone())
1075 .collect()
1076 })
1077 .unwrap_or_default();
1078 self.schema_vocabulary_apply(
1079 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1080 collection: query.name.clone(),
1081 columns: post_alter_columns,
1082 type_tags: Vec::new(),
1083 description: None,
1084 },
1085 );
1086
1087 let message = if messages.is_empty() {
1088 format!("table '{}' altered (no operations)", query.name)
1089 } else {
1090 format!("table '{}' altered: {}", query.name, messages.join(", "))
1091 };
1092
1093 Ok(RuntimeQueryResult::ok_message(
1094 raw_query.to_string(),
1095 &message,
1096 "alter",
1097 ))
1098 }
1099
1100 pub fn execute_explain_alter(
1107 &self,
1108 raw_query: &str,
1109 query: &ExplainAlterQuery,
1110 ) -> RedDBResult<RuntimeQueryResult> {
1111 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1115
1116 let current_contract = self.inner.db.collection_contract(&query.target.name);
1117
1118 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1119 .as_ref()
1120 .map(|c| c.declared_columns.clone())
1121 .unwrap_or_default();
1122
1123 let diff = super::schema_diff::compute_column_diff(
1124 &query.target.name,
1125 ¤t_columns,
1126 &query.target.columns,
1127 );
1128
1129 let rendered = match query.format {
1130 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1131 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1132 };
1133
1134 let format_label = match query.format {
1135 ExplainFormat::Sql => "sql",
1136 ExplainFormat::Json => "json",
1137 };
1138
1139 let columns = vec![
1140 "table".to_string(),
1141 "format".to_string(),
1142 "diff".to_string(),
1143 ];
1144 let row = vec![
1145 ("table".to_string(), Value::text(query.target.name.clone())),
1146 ("format".to_string(), Value::text(format_label.to_string())),
1147 ("diff".to_string(), Value::text(rendered)),
1148 ];
1149
1150 Ok(RuntimeQueryResult::ok_records(
1151 raw_query.to_string(),
1152 columns,
1153 vec![row],
1154 "explain",
1155 ))
1156 }
1157
1158 pub fn execute_create_index(
1163 &self,
1164 raw_query: &str,
1165 query: &CreateIndexQuery,
1166 ) -> RedDBResult<RuntimeQueryResult> {
1167 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1168 let store = self.inner.db.store();
1169
1170 let manager = store
1172 .get_collection(&query.table)
1173 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1174
1175 let method_kind = match query.method {
1176 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1177 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1178 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1179 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1180 };
1181
1182 let entities = manager.query_all(|_| true);
1192 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1193 entities
1194 .iter()
1195 .map(|e| {
1196 let fields = match &e.data {
1197 crate::storage::EntityData::Row(row) => {
1198 if let Some(ref named) = row.named {
1199 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1200 } else if let Some(ref schema) = row.schema {
1201 schema
1205 .iter()
1206 .zip(row.columns.iter())
1207 .map(|(k, v)| (k.clone(), v.clone()))
1208 .collect()
1209 } else {
1210 Vec::new()
1211 }
1212 }
1213 crate::storage::EntityData::Node(node) => node
1214 .properties
1215 .iter()
1216 .map(|(k, v)| (k.clone(), v.clone()))
1217 .collect(),
1218 _ => Vec::new(),
1219 };
1220 (e.id, fields)
1221 })
1222 .collect();
1223
1224 let indexed_count = self
1226 .inner
1227 .index_store
1228 .create_index(
1229 &query.name,
1230 &query.table,
1231 &query.columns,
1232 method_kind,
1233 query.unique,
1234 &entity_fields,
1235 )
1236 .map_err(RedDBError::Internal)?;
1237
1238 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1239 &query.table,
1240 &entity_fields,
1241 );
1242 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1243 self.invalidate_plan_cache();
1244
1245 self.inner
1247 .index_store
1248 .register(super::index_store::RegisteredIndex {
1249 name: query.name.clone(),
1250 collection: query.table.clone(),
1251 columns: query.columns.clone(),
1252 method: method_kind,
1253 unique: query.unique,
1254 });
1255 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1259 collection: query.table.clone(),
1260 index: query.name.clone(),
1261 columns: query.columns.clone(),
1262 });
1263
1264 let method_str = format!("{}", query.method);
1265 let unique_str = if query.unique { "unique " } else { "" };
1266 let cols = query.columns.join(", ");
1267
1268 Ok(RuntimeQueryResult::ok_message(
1269 raw_query.to_string(),
1270 &format!(
1271 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1272 unique_str, query.name, query.table, cols, method_str, indexed_count
1273 ),
1274 "create",
1275 ))
1276 }
1277
1278 pub fn execute_drop_index(
1282 &self,
1283 raw_query: &str,
1284 query: &DropIndexQuery,
1285 ) -> RedDBResult<RuntimeQueryResult> {
1286 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1287 let store = self.inner.db.store();
1288
1289 if store.get_collection(&query.table).is_none() {
1291 if query.if_exists {
1292 return Ok(RuntimeQueryResult::ok_message(
1293 raw_query.to_string(),
1294 &format!("table '{}' does not exist", query.table),
1295 "drop",
1296 ));
1297 }
1298 return Err(RedDBError::NotFound(format!(
1299 "table '{}' not found",
1300 query.table
1301 )));
1302 }
1303
1304 self.inner.index_store.drop_index(&query.name, &query.table);
1306 self.invalidate_plan_cache();
1307 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1309 collection: query.table.clone(),
1310 index: query.name.clone(),
1311 });
1312
1313 Ok(RuntimeQueryResult::ok_message(
1314 raw_query.to_string(),
1315 &format!("index '{}' dropped from '{}'", query.name, query.table),
1316 "drop",
1317 ))
1318 }
1319
1320 fn execute_drop_typed_collection(
1321 &self,
1322 raw_query: &str,
1323 name: &str,
1324 if_exists: bool,
1325 expected_model: CollectionModel,
1326 label: &str,
1327 ) -> RedDBResult<RuntimeQueryResult> {
1328 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1329 if is_system_schema_name(name) {
1330 return Err(RedDBError::Query("system schema is read-only".to_string()));
1331 }
1332 let store = self.inner.db.store();
1333 if store.get_collection(name).is_none() {
1334 if if_exists {
1335 return Ok(RuntimeQueryResult::ok_message(
1336 raw_query.to_string(),
1337 &format!("{label} '{name}' does not exist"),
1338 "drop",
1339 ));
1340 }
1341 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1342 }
1343
1344 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1345 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1346 self.drop_collection_storage(raw_query, name, label)
1347 }
1348
1349 pub fn execute_truncate(
1350 &self,
1351 raw_query: &str,
1352 query: &TruncateQuery,
1353 ) -> RedDBResult<RuntimeQueryResult> {
1354 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1355 if is_system_schema_name(&query.name) {
1356 return Err(RedDBError::Query("system schema is read-only".to_string()));
1357 }
1358
1359 let label = query
1360 .model
1361 .map(polymorphic_resolver::model_name)
1362 .unwrap_or("collection");
1363 let store = self.inner.db.store();
1364 if store.get_collection(&query.name).is_none() {
1365 if query.if_exists {
1366 return Ok(RuntimeQueryResult::ok_message(
1367 raw_query.to_string(),
1368 &format!("{label} '{}' does not exist", query.name),
1369 "truncate",
1370 ));
1371 }
1372 return Err(RedDBError::NotFound(format!(
1373 "{label} '{}' not found",
1374 query.name
1375 )));
1376 }
1377
1378 let actual =
1379 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1380 if let Some(expected) = query.model {
1381 polymorphic_resolver::ensure_model_match(expected, actual)?;
1382 }
1383
1384 if actual == CollectionModel::Queue {
1385 return self.execute_queue_command(
1386 raw_query,
1387 &QueueCommand::Purge {
1388 queue: query.name.clone(),
1389 },
1390 );
1391 }
1392
1393 let affected = self.truncate_collection_entities(&query.name)?;
1395 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1397 self.inner.db.invalidate_vector_index(&query.name);
1398 self.clear_table_planner_stats(&query.name);
1399 self.invalidate_result_cache();
1400
1401 Ok(RuntimeQueryResult::ok_message(
1402 raw_query.to_string(),
1403 &format!(
1404 "{affected} entities truncated from {label} '{}'",
1405 query.name
1406 ),
1407 "truncate",
1408 ))
1409 }
1410
1411 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1412 let store = self.inner.db.store();
1413 let Some(manager) = store.get_collection(name) else {
1414 return Ok(0);
1415 };
1416 let entities = manager.query_all(|_| true);
1417 if entities.is_empty() {
1418 return Ok(0);
1419 }
1420
1421 for entity in &entities {
1422 let fields = entity_index_fields(&entity.data);
1423 self.inner
1424 .index_store
1425 .index_entity_delete(name, entity.id, &fields)
1426 .map_err(RedDBError::Internal)?;
1427 }
1428
1429 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1430 let deleted_ids = store
1431 .delete_batch(name, &ids)
1432 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1433 for id in &deleted_ids {
1434 store.context_index().remove_entity(*id);
1435 }
1436 Ok(deleted_ids.len() as u64)
1437 }
1438
1439 fn drop_collection_storage(
1440 &self,
1441 raw_query: &str,
1442 name: &str,
1443 label: &str,
1444 ) -> RedDBResult<RuntimeQueryResult> {
1445 let store = self.inner.db.store();
1446
1447 let final_count = store
1450 .get_collection(name)
1451 .map(|manager| manager.query_all(|_| true).len() as u64)
1452 .unwrap_or(0);
1453 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1454 self,
1455 name,
1456 final_count,
1457 )?;
1458
1459 let orphaned_indices: Vec<String> = self
1460 .inner
1461 .index_store
1462 .list_indices(name)
1463 .into_iter()
1464 .map(|index| index.name)
1465 .collect();
1466 for index_name in &orphaned_indices {
1467 self.inner.index_store.drop_index(index_name, name);
1468 }
1469
1470 store
1471 .drop_collection(name)
1472 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1473 self.inner.db.invalidate_vector_index(name);
1474 self.inner.db.clear_collection_default_ttl_ms(name);
1475 self.inner
1476 .db
1477 .remove_collection_contract(name)
1478 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1479 self.clear_table_planner_stats(name);
1480 self.invalidate_result_cache();
1481 if let Some(store) = self.inner.auth_store.read().clone() {
1482 store.invalidate_visible_collections_cache();
1483 }
1484 self.inner
1485 .db
1486 .persist_metadata()
1487 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1488 self.schema_vocabulary_apply(
1489 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1490 collection: name.to_string(),
1491 },
1492 );
1493
1494 Ok(RuntimeQueryResult::ok_message(
1495 raw_query.to_string(),
1496 &format!("{label} '{name}' dropped"),
1497 "drop",
1498 ))
1499 }
1500}
1501
1502pub(crate) fn is_system_schema_name(name: &str) -> bool {
1503 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1504}
1505
1506fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1507 match data {
1508 EntityData::Row(row) => {
1509 if let Some(ref named) = row.named {
1510 named
1511 .iter()
1512 .map(|(key, value)| (key.clone(), value.clone()))
1513 .collect()
1514 } else if let Some(ref schema) = row.schema {
1515 schema
1516 .iter()
1517 .zip(row.columns.iter())
1518 .map(|(key, value)| (key.clone(), value.clone()))
1519 .collect()
1520 } else {
1521 Vec::new()
1522 }
1523 }
1524 EntityData::Node(node) => node
1525 .properties
1526 .iter()
1527 .map(|(key, value)| (key.clone(), value.clone()))
1528 .collect(),
1529 _ => Vec::new(),
1530 }
1531}
1532
1533fn collection_contract_from_create_table(
1534 query: &CreateTableQuery,
1535) -> RedDBResult<crate::physical::CollectionContract> {
1536 let now = current_unix_ms();
1537 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1538 .columns
1539 .iter()
1540 .map(declared_column_contract_from_ddl)
1541 .collect();
1542 if query.timestamps {
1543 declared_columns.push(crate::physical::DeclaredColumnContract {
1547 name: "created_at".to_string(),
1548 data_type: "BIGINT".to_string(),
1549 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1550 not_null: true,
1551 default: None,
1552 compress: None,
1553 unique: false,
1554 primary_key: false,
1555 enum_variants: Vec::new(),
1556 array_element: None,
1557 decimal_precision: None,
1558 });
1559 declared_columns.push(crate::physical::DeclaredColumnContract {
1560 name: "updated_at".to_string(),
1561 data_type: "BIGINT".to_string(),
1562 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1563 not_null: true,
1564 default: None,
1565 compress: None,
1566 unique: false,
1567 primary_key: false,
1568 enum_variants: Vec::new(),
1569 array_element: None,
1570 decimal_precision: None,
1571 });
1572 }
1573 Ok(crate::physical::CollectionContract {
1574 name: query.name.clone(),
1575 declared_model: crate::catalog::CollectionModel::Table,
1576 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1577 origin: crate::physical::ContractOrigin::Explicit,
1578 version: 1,
1579 created_at_unix_ms: now,
1580 updated_at_unix_ms: now,
1581 default_ttl_ms: query.default_ttl_ms,
1582 vector_dimension: None,
1583 vector_metric: None,
1584 context_index_fields: query.context_index_fields.clone(),
1585 declared_columns,
1586 table_def: Some(build_table_def_from_create_table(query)?),
1587 timestamps_enabled: query.timestamps,
1588 context_index_enabled: query.context_index_enabled
1589 || !query.context_index_fields.is_empty(),
1590 metrics_raw_retention_ms: None,
1591 metrics_rollup_policies: Vec::new(),
1592 metrics_tenant_identity: None,
1593 metrics_namespace: None,
1594 append_only: query.append_only,
1595 subscriptions: query.subscriptions.clone(),
1596 session_key: None,
1597 session_gap_ms: None,
1598 retention_duration_ms: None,
1599 })
1600}
1601
1602fn default_collection_contract_for_existing_table(
1603 name: &str,
1604) -> crate::physical::CollectionContract {
1605 let now = current_unix_ms();
1606 crate::physical::CollectionContract {
1607 name: name.to_string(),
1608 declared_model: crate::catalog::CollectionModel::Table,
1609 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1610 origin: crate::physical::ContractOrigin::Explicit,
1611 version: 0,
1612 created_at_unix_ms: now,
1613 updated_at_unix_ms: now,
1614 default_ttl_ms: None,
1615 vector_dimension: None,
1616 vector_metric: None,
1617 context_index_fields: Vec::new(),
1618 declared_columns: Vec::new(),
1619 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1620 timestamps_enabled: false,
1621 context_index_enabled: false,
1622 metrics_raw_retention_ms: None,
1623 metrics_rollup_policies: Vec::new(),
1624 metrics_tenant_identity: None,
1625 metrics_namespace: None,
1626 append_only: false,
1627 subscriptions: Vec::new(),
1628 session_key: None,
1629 session_gap_ms: None,
1630 retention_duration_ms: None,
1631 }
1632}
1633
1634fn keyed_collection_contract(
1635 name: &str,
1636 model: crate::catalog::CollectionModel,
1637) -> crate::physical::CollectionContract {
1638 let now = current_unix_ms();
1639 crate::physical::CollectionContract {
1640 name: name.to_string(),
1641 declared_model: model,
1642 schema_mode: crate::catalog::SchemaMode::Dynamic,
1643 origin: crate::physical::ContractOrigin::Explicit,
1644 version: 1,
1645 created_at_unix_ms: now,
1646 updated_at_unix_ms: now,
1647 default_ttl_ms: None,
1648 vector_dimension: None,
1649 vector_metric: None,
1650 context_index_fields: Vec::new(),
1651 declared_columns: Vec::new(),
1652 table_def: None,
1653 timestamps_enabled: false,
1654 context_index_enabled: false,
1655 metrics_raw_retention_ms: None,
1656 metrics_rollup_policies: Vec::new(),
1657 metrics_tenant_identity: None,
1658 metrics_namespace: None,
1659 append_only: false,
1660 subscriptions: Vec::new(),
1661 session_key: None,
1662 session_gap_ms: None,
1663 retention_duration_ms: None,
1664 }
1665}
1666
1667fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1668 let now = current_unix_ms();
1669 crate::physical::CollectionContract {
1670 name: query.name.clone(),
1671 declared_model: crate::catalog::CollectionModel::Metrics,
1672 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1673 origin: crate::physical::ContractOrigin::Explicit,
1674 version: 1,
1675 created_at_unix_ms: now,
1676 updated_at_unix_ms: now,
1677 default_ttl_ms: query.default_ttl_ms,
1678 vector_dimension: None,
1679 vector_metric: None,
1680 context_index_fields: Vec::new(),
1681 declared_columns: Vec::new(),
1682 table_def: None,
1683 timestamps_enabled: false,
1684 context_index_enabled: false,
1685 metrics_raw_retention_ms: query.default_ttl_ms,
1686 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1687 metrics_tenant_identity: Some(
1688 query
1689 .tenant_by
1690 .clone()
1691 .unwrap_or_else(|| "current_tenant".to_string()),
1692 ),
1693 metrics_namespace: Some("default".to_string()),
1694 append_only: true,
1695 subscriptions: Vec::new(),
1696 session_key: None,
1697 session_gap_ms: None,
1698 retention_duration_ms: None,
1699 }
1700}
1701
1702fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1703 let now = current_unix_ms();
1704 crate::physical::CollectionContract {
1705 name: query.name.clone(),
1706 declared_model: crate::catalog::CollectionModel::Vector,
1707 schema_mode: crate::catalog::SchemaMode::Dynamic,
1708 origin: crate::physical::ContractOrigin::Explicit,
1709 version: 1,
1710 created_at_unix_ms: now,
1711 updated_at_unix_ms: now,
1712 default_ttl_ms: None,
1713 vector_dimension: Some(query.dimension),
1714 vector_metric: Some(query.metric),
1715 context_index_fields: Vec::new(),
1716 declared_columns: Vec::new(),
1717 table_def: None,
1718 timestamps_enabled: false,
1719 context_index_enabled: false,
1720 metrics_raw_retention_ms: None,
1721 metrics_rollup_policies: Vec::new(),
1722 metrics_tenant_identity: None,
1723 metrics_namespace: None,
1724 append_only: false,
1725 subscriptions: Vec::new(),
1726 session_key: None,
1727 session_gap_ms: None,
1728 retention_duration_ms: None,
1729 }
1730}
1731
1732fn declared_column_contract_from_ddl(
1733 column: &CreateColumnDef,
1734) -> crate::physical::DeclaredColumnContract {
1735 crate::physical::DeclaredColumnContract {
1736 name: column.name.clone(),
1737 data_type: column.data_type.clone(),
1738 sql_type: Some(column.sql_type.clone()),
1739 not_null: column.not_null,
1740 default: column.default.clone(),
1741 compress: column.compress,
1742 unique: column.unique,
1743 primary_key: column.primary_key,
1744 enum_variants: column.enum_variants.clone(),
1745 array_element: column.array_element.clone(),
1746 decimal_precision: column.decimal_precision,
1747 }
1748}
1749
1750fn apply_alter_operations_to_contract(
1751 contract: &mut crate::physical::CollectionContract,
1752 operations: &[AlterOperation],
1753) {
1754 if contract.table_def.is_none() {
1755 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1756 }
1757 for operation in operations {
1758 match operation {
1759 AlterOperation::AddColumn(column) => {
1760 if !contract
1761 .declared_columns
1762 .iter()
1763 .any(|existing| existing.name == column.name)
1764 {
1765 contract
1766 .declared_columns
1767 .push(declared_column_contract_from_ddl(column));
1768 }
1769 if let Some(table_def) = contract.table_def.as_mut() {
1770 if table_def.get_column(&column.name).is_none() {
1771 if let Ok(column_def) = column_def_from_ddl(column) {
1772 if column.primary_key {
1773 table_def.primary_key.push(column.name.clone());
1774 table_def.constraints.push(
1775 crate::storage::schema::Constraint::new(
1776 format!("pk_{}", column.name),
1777 crate::storage::schema::ConstraintType::PrimaryKey,
1778 )
1779 .on_columns(vec![column.name.clone()]),
1780 );
1781 }
1782 if column.unique {
1783 table_def.constraints.push(
1784 crate::storage::schema::Constraint::new(
1785 format!("uniq_{}", column.name),
1786 crate::storage::schema::ConstraintType::Unique,
1787 )
1788 .on_columns(vec![column.name.clone()]),
1789 );
1790 }
1791 if column.not_null {
1792 table_def.constraints.push(
1793 crate::storage::schema::Constraint::new(
1794 format!("not_null_{}", column.name),
1795 crate::storage::schema::ConstraintType::NotNull,
1796 )
1797 .on_columns(vec![column.name.clone()]),
1798 );
1799 }
1800 table_def.columns.push(column_def);
1801 }
1802 }
1803 }
1804 }
1805 AlterOperation::DropColumn(name) => {
1806 contract
1807 .declared_columns
1808 .retain(|column| column.name != *name);
1809 if let Some(table_def) = contract.table_def.as_mut() {
1810 if let Some(index) = table_def.column_index(name) {
1811 table_def.columns.remove(index);
1812 }
1813 table_def.primary_key.retain(|column| column != name);
1814 table_def.constraints.retain(|constraint| {
1815 !constraint.columns.iter().any(|column| column == name)
1816 });
1817 table_def
1818 .indexes
1819 .retain(|index| !index.columns.iter().any(|column| column == name));
1820 }
1821 }
1822 AlterOperation::RenameColumn { from, to } => {
1823 if contract
1824 .declared_columns
1825 .iter()
1826 .any(|column| column.name == *to)
1827 {
1828 continue;
1829 }
1830 if let Some(column) = contract
1831 .declared_columns
1832 .iter_mut()
1833 .find(|column| column.name == *from)
1834 {
1835 column.name = to.clone();
1836 }
1837 if let Some(table_def) = contract.table_def.as_mut() {
1838 if let Some(column) = table_def
1839 .columns
1840 .iter_mut()
1841 .find(|column| column.name == *from)
1842 {
1843 column.name = to.clone();
1844 }
1845 for primary_key in &mut table_def.primary_key {
1846 if *primary_key == *from {
1847 *primary_key = to.clone();
1848 }
1849 }
1850 for constraint in &mut table_def.constraints {
1851 for column in &mut constraint.columns {
1852 if *column == *from {
1853 *column = to.clone();
1854 }
1855 }
1856 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1857 for column in ref_columns {
1858 if *column == *from {
1859 *column = to.clone();
1860 }
1861 }
1862 }
1863 }
1864 for index in &mut table_def.indexes {
1865 for column in &mut index.columns {
1866 if *column == *from {
1867 *column = to.clone();
1868 }
1869 }
1870 }
1871 }
1872 }
1873 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1876 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1880 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1883 AlterOperation::SetAppendOnly(on) => {
1884 contract.append_only = *on;
1885 }
1886 AlterOperation::SetVersioned(_) => {}
1889 AlterOperation::EnableEvents(subscription) => {
1890 let mut subscription = subscription.clone();
1891 subscription.source = contract.name.clone();
1892 subscription.enabled = true;
1893 if let Some(existing) = contract
1894 .subscriptions
1895 .iter_mut()
1896 .find(|existing| existing.target_queue == subscription.target_queue)
1897 {
1898 *existing = subscription;
1899 } else {
1900 contract.subscriptions.push(subscription);
1901 }
1902 }
1903 AlterOperation::DisableEvents => {
1904 for subscription in &mut contract.subscriptions {
1905 subscription.enabled = false;
1906 }
1907 }
1908 AlterOperation::AddSubscription { name, descriptor } => {
1909 let mut sub = descriptor.clone();
1910 sub.name = name.clone();
1911 sub.source = contract.name.clone();
1912 sub.enabled = true;
1913 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1914 {
1915 *existing = sub;
1916 } else {
1917 contract.subscriptions.push(sub);
1918 }
1919 }
1920 AlterOperation::DropSubscription { name } => {
1921 contract.subscriptions.retain(|s| s.name != *name);
1922 }
1923 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1928 AlterOperation::SetRetention { duration_ms } => {
1929 contract.retention_duration_ms = Some(*duration_ms);
1930 }
1931 AlterOperation::UnsetRetention => {
1932 contract.retention_duration_ms = None;
1933 }
1934 }
1935 }
1936}
1937
1938pub(crate) fn retention_timestamp_column_exists(
1943 contract: &crate::physical::CollectionContract,
1944) -> bool {
1945 if contract.timestamps_enabled {
1946 return true;
1947 }
1948 if matches!(
1949 contract.declared_model,
1950 crate::catalog::CollectionModel::TimeSeries
1951 | crate::catalog::CollectionModel::Metrics
1952 ) {
1953 return true;
1957 }
1958 contract
1959 .declared_columns
1960 .iter()
1961 .any(|column| is_temporal_data_type(&column.data_type))
1962}
1963
1964fn is_temporal_data_type(data_type: &str) -> bool {
1965 let upper = data_type.to_ascii_uppercase();
1966 matches!(
1967 upper.as_str(),
1968 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
1969 )
1970}
1971
1972fn validate_event_subscriptions(
1973 runtime: &RedDBRuntime,
1974 source: &str,
1975 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1976) -> RedDBResult<()> {
1977 for subscription in subscriptions
1978 .iter()
1979 .filter(|subscription| subscription.enabled)
1980 {
1981 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1982 return Err(RedDBError::Query(
1983 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1984 ));
1985 }
1986 validate_subscription_auth(runtime, source, subscription)?;
1987 if subscription.target_queue == source
1988 || subscription_would_create_cycle(
1989 &runtime.inner.db,
1990 source,
1991 &subscription.target_queue,
1992 )
1993 {
1994 return Err(RedDBError::Query(
1995 "subscription would create cycle".to_string(),
1996 ));
1997 }
1998 audit_subscription_redact_gap(runtime, source, subscription);
1999 }
2000 Ok(())
2001}
2002
2003fn validate_subscription_auth(
2004 runtime: &RedDBRuntime,
2005 source: &str,
2006 subscription: &crate::catalog::SubscriptionDescriptor,
2007) -> RedDBResult<()> {
2008 let auth_store = match runtime.inner.auth_store.read().clone() {
2009 Some(store) => store,
2010 None => return Ok(()),
2011 };
2012 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2013 Some(identity) => identity,
2014 None => return Ok(()),
2015 };
2016 let tenant = crate::runtime::impl_core::current_tenant();
2017 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2018
2019 if auth_store.iam_authorization_enabled() {
2020 let ctx = crate::auth::policies::EvalContext {
2021 principal_tenant: tenant.clone(),
2022 current_tenant: tenant.clone(),
2023 peer_ip: None,
2024 mfa_present: false,
2025 now_ms: crate::auth::now_ms(),
2026 principal_is_admin_role: role == crate::auth::Role::Admin,
2027 };
2028 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2029 if let Some(t) = tenant.as_deref() {
2030 source_resource = source_resource.with_tenant(t.to_string());
2031 }
2032 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
2033 return Err(RedDBError::Query(format!(
2034 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2035 principal, source_resource.kind, source_resource.name
2036 )));
2037 }
2038
2039 let mut target_resource =
2040 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2041 if let Some(t) = tenant.as_deref() {
2042 target_resource = target_resource.with_tenant(t.to_string());
2043 }
2044 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
2045 return Err(RedDBError::Query(format!(
2046 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2047 principal, target_resource.kind, target_resource.name
2048 )));
2049 }
2050 return Ok(());
2051 }
2052
2053 let ctx = crate::auth::privileges::AuthzContext {
2054 principal: &username,
2055 effective_role: role,
2056 tenant: tenant.as_deref(),
2057 };
2058 auth_store
2059 .check_grant(
2060 &ctx,
2061 crate::auth::privileges::Action::Select,
2062 &crate::auth::privileges::Resource::table_from_name(source),
2063 )
2064 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2065 auth_store
2066 .check_grant(
2067 &ctx,
2068 crate::auth::privileges::Action::Insert,
2069 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2070 )
2071 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2072 Ok(())
2073}
2074
2075fn audit_subscription_redact_gap(
2076 runtime: &RedDBRuntime,
2077 source: &str,
2078 subscription: &crate::catalog::SubscriptionDescriptor,
2079) {
2080 let auth_store = match runtime.inner.auth_store.read().clone() {
2081 Some(store) if store.iam_authorization_enabled() => store,
2082 _ => return,
2083 };
2084 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2085 Some(identity) => identity,
2086 None => return,
2087 };
2088 let tenant = crate::runtime::impl_core::current_tenant();
2089 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2090 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2091 if missing.is_empty() {
2092 return;
2093 }
2094
2095 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2096 tracing::warn!(
2097 target: "reddb::operator",
2098 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2099 source,
2100 subscription.target_queue,
2101 columns
2102 );
2103 let mut event = AuditEvent::builder("subscription_redact_gap")
2104 .principal(username)
2105 .source(AuditAuthSource::System)
2106 .resource(format!(
2107 "subscription:{}->{}",
2108 source, subscription.target_queue
2109 ))
2110 .outcome(Outcome::Success)
2111 .field(AuditFieldEscaper::field("source", source))
2112 .field(AuditFieldEscaper::field(
2113 "target_queue",
2114 subscription.target_queue.clone(),
2115 ))
2116 .field(AuditFieldEscaper::field(
2117 "subscription",
2118 subscription.name.clone(),
2119 ))
2120 .field(AuditFieldEscaper::field("columns", columns))
2121 .field(AuditFieldEscaper::field("role", role.as_str()));
2122 if let Some(t) = tenant {
2123 event = event.tenant(t);
2124 }
2125 runtime.inner.audit_log.record_event(event.build());
2126}
2127
2128fn subscription_redact_gap_columns(
2129 auth_store: &crate::auth::store::AuthStore,
2130 principal: &crate::auth::UserId,
2131 source: &str,
2132 subscription: &crate::catalog::SubscriptionDescriptor,
2133) -> BTreeSet<String> {
2134 let redacted: HashSet<String> = subscription
2135 .redact_fields
2136 .iter()
2137 .map(|field| field.to_ascii_lowercase())
2138 .collect();
2139 auth_store
2140 .effective_policies(principal)
2141 .iter()
2142 .flat_map(|policy| policy.statements.iter())
2143 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2144 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2145 .flat_map(|statement| statement.resources.iter())
2146 .filter_map(|resource| denied_column_for_source(resource, source))
2147 .filter(|column| !redact_covers_column(&redacted, source, column))
2148 .collect()
2149}
2150
2151fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2152 match pattern {
2153 crate::auth::policies::ActionPattern::Wildcard => true,
2154 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2155 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2156 "select".len() > prefix.len() + 1
2157 && "select".starts_with(prefix)
2158 && "select".as_bytes()[prefix.len()] == b':'
2159 }
2160 }
2161}
2162
2163fn denied_column_for_source(
2164 resource: &crate::auth::policies::ResourcePattern,
2165 source: &str,
2166) -> Option<String> {
2167 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2168 return None;
2169 };
2170 if kind != "column" {
2171 return None;
2172 }
2173 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2174 (column.table_resource_name() == source).then_some(column.column)
2175}
2176
2177fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2178 let column = column.to_ascii_lowercase();
2179 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2180 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2181}
2182
2183fn subscription_would_create_cycle(
2184 db: &crate::storage::unified::devx::RedDB,
2185 source: &str,
2186 target: &str,
2187) -> bool {
2188 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2189 for contract in db.collection_contracts() {
2190 for subscription in contract
2191 .subscriptions
2192 .into_iter()
2193 .filter(|subscription| subscription.enabled)
2194 {
2195 graph
2196 .entry(subscription.source)
2197 .or_default()
2198 .push(subscription.target_queue);
2199 }
2200 }
2201 graph
2202 .entry(source.to_string())
2203 .or_default()
2204 .push(target.to_string());
2205
2206 let mut stack = vec![target.to_string()];
2207 let mut seen = HashSet::new();
2208 while let Some(node) = stack.pop() {
2209 if node == source {
2210 return true;
2211 }
2212 if !seen.insert(node.clone()) {
2213 continue;
2214 }
2215 if let Some(next) = graph.get(&node) {
2216 stack.extend(next.iter().cloned());
2217 }
2218 }
2219 false
2220}
2221
2222pub(crate) fn ensure_event_target_queue_pub(
2223 runtime: &RedDBRuntime,
2224 queue: &str,
2225) -> RedDBResult<()> {
2226 ensure_event_target_queue(runtime, queue)
2227}
2228
2229fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2230 let store = runtime.inner.db.store();
2231 if store.get_collection(queue).is_some() {
2232 return Ok(());
2233 }
2234 store
2235 .create_collection(queue)
2236 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2237 runtime
2238 .inner
2239 .db
2240 .save_collection_contract(event_queue_collection_contract(queue))
2241 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2242 store.set_config_tree(
2243 &format!("queue.{queue}.mode"),
2244 &crate::serde_json::Value::String("fanout".to_string()),
2245 );
2246 Ok(())
2247}
2248
2249fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2250 let now = current_unix_ms();
2251 crate::physical::CollectionContract {
2252 name: queue.to_string(),
2253 declared_model: crate::catalog::CollectionModel::Queue,
2254 schema_mode: crate::catalog::SchemaMode::Dynamic,
2255 origin: crate::physical::ContractOrigin::Implicit,
2256 version: 1,
2257 created_at_unix_ms: now,
2258 updated_at_unix_ms: now,
2259 default_ttl_ms: None,
2260 vector_dimension: None,
2261 vector_metric: None,
2262 context_index_fields: Vec::new(),
2263 declared_columns: Vec::new(),
2264 table_def: None,
2265 timestamps_enabled: false,
2266 context_index_enabled: false,
2267 metrics_raw_retention_ms: None,
2268 metrics_rollup_policies: Vec::new(),
2269 metrics_tenant_identity: None,
2270 metrics_namespace: None,
2271 append_only: true,
2272 subscriptions: Vec::new(),
2273 session_key: None,
2274 session_gap_ms: None,
2275 retention_duration_ms: None,
2276 }
2277}
2278
2279fn build_table_def_from_create_table(
2280 query: &CreateTableQuery,
2281) -> RedDBResult<crate::storage::schema::TableDef> {
2282 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2283 for column in &query.columns {
2284 if column.primary_key {
2285 table.primary_key.push(column.name.clone());
2286 table.constraints.push(
2287 crate::storage::schema::Constraint::new(
2288 format!("pk_{}", column.name),
2289 crate::storage::schema::ConstraintType::PrimaryKey,
2290 )
2291 .on_columns(vec![column.name.clone()]),
2292 );
2293 }
2294 if column.unique {
2295 table.constraints.push(
2296 crate::storage::schema::Constraint::new(
2297 format!("uniq_{}", column.name),
2298 crate::storage::schema::ConstraintType::Unique,
2299 )
2300 .on_columns(vec![column.name.clone()]),
2301 );
2302 }
2303 if column.not_null {
2304 table.constraints.push(
2305 crate::storage::schema::Constraint::new(
2306 format!("not_null_{}", column.name),
2307 crate::storage::schema::ConstraintType::NotNull,
2308 )
2309 .on_columns(vec![column.name.clone()]),
2310 );
2311 }
2312 table.columns.push(column_def_from_ddl(column)?);
2313 }
2314 if query.timestamps {
2319 table.columns.push(
2320 crate::storage::schema::ColumnDef::new(
2321 "created_at".to_string(),
2322 crate::storage::schema::DataType::UnsignedInteger,
2323 )
2324 .not_null(),
2325 );
2326 table.columns.push(
2327 crate::storage::schema::ColumnDef::new(
2328 "updated_at".to_string(),
2329 crate::storage::schema::DataType::UnsignedInteger,
2330 )
2331 .not_null(),
2332 );
2333 table.constraints.push(
2334 crate::storage::schema::Constraint::new(
2335 "not_null_created_at".to_string(),
2336 crate::storage::schema::ConstraintType::NotNull,
2337 )
2338 .on_columns(vec!["created_at".to_string()]),
2339 );
2340 table.constraints.push(
2341 crate::storage::schema::Constraint::new(
2342 "not_null_updated_at".to_string(),
2343 crate::storage::schema::ConstraintType::NotNull,
2344 )
2345 .on_columns(vec!["updated_at".to_string()]),
2346 );
2347 }
2348 table
2349 .validate()
2350 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2351 Ok(table)
2352}
2353
2354fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2355 let data_type = resolve_declared_data_type(&column.data_type)
2356 .map_err(|err| RedDBError::Query(err.to_string()))?;
2357 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2358 if column.not_null {
2359 column_def = column_def.not_null();
2360 }
2361 if let Some(default) = &column.default {
2362 column_def = column_def.with_default(default.as_bytes().to_vec());
2363 }
2364 if column.compress.unwrap_or(0) > 0 {
2365 column_def = column_def.compressed();
2366 }
2367 if !column.enum_variants.is_empty() {
2368 column_def = column_def.with_variants(column.enum_variants.clone());
2369 }
2370 if let Some(precision) = column.decimal_precision {
2371 column_def = column_def.with_precision(precision);
2372 }
2373 if let Some(element_type) = &column.array_element {
2374 column_def = column_def.with_element_type(
2375 resolve_declared_data_type(element_type)
2376 .map_err(|err| RedDBError::Query(err.to_string()))?,
2377 );
2378 }
2379 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2380 if column.unique {
2381 column_def = column_def.with_metadata("unique", "true");
2382 }
2383 if column.primary_key {
2384 column_def = column_def.with_metadata("primary_key", "true");
2385 }
2386 Ok(column_def)
2387}
2388
2389fn current_unix_ms() -> u128 {
2390 std::time::SystemTime::now()
2391 .duration_since(std::time::UNIX_EPOCH)
2392 .unwrap_or_default()
2393 .as_millis()
2394}
2395
2396#[cfg(test)]
2397mod tests {
2398 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2399 use crate::auth::store::{AuthStore, PrincipalRef};
2400 use crate::auth::UserId;
2401 use crate::auth::{AuthConfig, Role};
2402 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2403 use crate::storage::schema::Value;
2404 use crate::{RedDBOptions, RedDBRuntime};
2405 use std::sync::Arc;
2406
2407 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2408 Policy {
2409 id: id.to_string(),
2410 version: 1,
2411 tenant: None,
2412 created_at: 0,
2413 updated_at: 0,
2414 statements: vec![Statement {
2415 sid: None,
2416 effect: Effect::Allow,
2417 actions: vec![ActionPattern::Exact(action.to_string())],
2418 resources: vec![ResourcePattern::Exact {
2419 kind: "collection".to_string(),
2420 name: collection.to_string(),
2421 }],
2422 condition: None,
2423 }],
2424 }
2425 }
2426
2427 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2428 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2429 *rt.inner.auth_store.write() = Some(store.clone());
2430 store
2431 }
2432
2433 #[test]
2434 fn drop_denied_without_iam_policy() {
2435 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2436 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2437 let store = wire_auth_store(&rt);
2438 let select_only = Policy {
2440 id: "select-only".to_string(),
2441 version: 1,
2442 tenant: None,
2443 created_at: 0,
2444 updated_at: 0,
2445 statements: vec![Statement {
2446 sid: None,
2447 effect: Effect::Allow,
2448 actions: vec![ActionPattern::Exact("select".to_string())],
2449 resources: vec![ResourcePattern::Wildcard],
2450 condition: None,
2451 }],
2452 };
2453 store.put_policy_internal(select_only).unwrap();
2454 let alice = UserId::from_parts(None, "alice");
2455 store
2456 .attach_policy(PrincipalRef::User(alice), "select-only")
2457 .unwrap();
2458 set_current_auth_identity("alice".to_string(), Role::Write);
2459 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2460 clear_current_auth_identity();
2461 assert!(
2462 format!("{err}").contains("denied by IAM policy"),
2463 "got: {err}"
2464 );
2465 }
2466
2467 #[test]
2468 fn drop_allowed_with_explicit_iam_policy() {
2469 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2470 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2471 let store = wire_auth_store(&rt);
2472 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2473 store.put_policy_internal(policy).unwrap();
2474 let bob = UserId::from_parts(None, "bob");
2475 store
2476 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2477 .unwrap();
2478 set_current_auth_identity("bob".to_string(), Role::Write);
2479 rt.execute_query("DROP TABLE bar").unwrap();
2480 clear_current_auth_identity();
2481 }
2482
2483 #[test]
2484 fn drop_allowed_with_wildcard_iam_policy() {
2485 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2486 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2487 let store = wire_auth_store(&rt);
2488 let policy = Policy {
2489 id: "allow-drop-all".to_string(),
2490 version: 1,
2491 tenant: None,
2492 created_at: 0,
2493 updated_at: 0,
2494 statements: vec![Statement {
2495 sid: None,
2496 effect: Effect::Allow,
2497 actions: vec![ActionPattern::Exact("drop".to_string())],
2498 resources: vec![ResourcePattern::Wildcard],
2499 condition: None,
2500 }],
2501 };
2502 store.put_policy_internal(policy).unwrap();
2503 let carl = UserId::from_parts(None, "carl");
2504 store
2505 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2506 .unwrap();
2507 set_current_auth_identity("carl".to_string(), Role::Write);
2508 rt.execute_query("DROP TABLE baz").unwrap();
2509 clear_current_auth_identity();
2510 }
2511
2512 #[test]
2513 fn truncate_denied_without_iam_policy() {
2514 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2515 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2516 let store = wire_auth_store(&rt);
2517 let select_only = Policy {
2519 id: "select-only-2".to_string(),
2520 version: 1,
2521 tenant: None,
2522 created_at: 0,
2523 updated_at: 0,
2524 statements: vec![Statement {
2525 sid: None,
2526 effect: Effect::Allow,
2527 actions: vec![ActionPattern::Exact("select".to_string())],
2528 resources: vec![ResourcePattern::Wildcard],
2529 condition: None,
2530 }],
2531 };
2532 store.put_policy_internal(select_only).unwrap();
2533 let dana = UserId::from_parts(None, "dana");
2534 store
2535 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2536 .unwrap();
2537 set_current_auth_identity("dana".to_string(), Role::Write);
2538 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2539 clear_current_auth_identity();
2540 assert!(
2541 format!("{err}").contains("denied by IAM policy"),
2542 "got: {err}"
2543 );
2544 }
2545
2546 #[test]
2547 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2548 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2549 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2550 .unwrap();
2551 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2552 .unwrap();
2553 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2554 .unwrap();
2555
2556 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2557 assert_eq!(truncated.statement_type, "truncate");
2558 assert_eq!(truncated.affected_rows, 0);
2559
2560 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2561 assert!(empty.result.records.is_empty());
2562
2563 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2564 .unwrap();
2565 let selected = rt
2566 .execute_query("SELECT name FROM users WHERE id = 3")
2567 .unwrap();
2568 let name = selected.result.records[0].get("name").unwrap();
2569 assert_eq!(name, &Value::text("cy"));
2570 assert!(rt.db().collection_contract("users").is_some());
2571 assert!(rt
2572 .inner
2573 .index_store
2574 .list_indices("users")
2575 .iter()
2576 .any(|index| index.name == "idx_users_id"));
2577 }
2578
2579 #[test]
2580 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2581 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2582 rt.execute_query("CREATE QUEUE tasks").unwrap();
2583 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2584
2585 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2586 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2587
2588 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2589 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2590 assert_eq!(
2591 len.result.records[0].get("len"),
2592 Some(&Value::UnsignedInteger(0))
2593 );
2594 }
2595
2596 #[test]
2597 fn truncate_system_schema_is_read_only() {
2598 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2599 let err = rt
2600 .execute_query("TRUNCATE COLLECTION red.collections")
2601 .unwrap_err();
2602 assert!(format!("{err}").contains("system schema is read-only"));
2603 }
2604
2605 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2608 let result = rt
2609 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2610 .expect("peek queue");
2611 result
2612 .result
2613 .records
2614 .iter()
2615 .map(
2616 |record| match record.get("payload").expect("payload column") {
2617 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2618 other => panic!("expected JSON queue payload, got {other:?}"),
2619 },
2620 )
2621 .collect()
2622 }
2623
2624 #[test]
2627 fn truncate_event_enabled_table_emits_single_truncate_event() {
2628 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2629 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2630 .unwrap();
2631 rt.execute_query(
2632 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2633 )
2634 .unwrap();
2635
2636 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2638
2639 rt.execute_query("TRUNCATE TABLE users").unwrap();
2640
2641 let events = queue_payloads(&rt, "users_events");
2642 assert_eq!(
2644 events.len(),
2645 1,
2646 "expected 1 truncate event, got {}",
2647 events.len()
2648 );
2649 let ev = events[0].as_object().expect("event is object");
2650 assert_eq!(
2651 ev.get("op").and_then(crate::json::Value::as_str),
2652 Some("truncate")
2653 );
2654 assert_eq!(
2655 ev.get("collection").and_then(crate::json::Value::as_str),
2656 Some("users")
2657 );
2658 assert_eq!(
2659 ev.get("entities_count")
2660 .and_then(crate::json::Value::as_u64),
2661 Some(3)
2662 );
2663 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2664 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2665 assert!(ev
2666 .get("event_id")
2667 .and_then(crate::json::Value::as_str)
2668 .is_some_and(|s| !s.is_empty()));
2669 }
2670
2671 #[test]
2673 fn truncate_no_events_collection_emits_nothing() {
2674 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2675 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2676 .unwrap();
2677 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2678 .unwrap();
2679 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2681 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2683 assert!(rows.result.records.is_empty());
2684 }
2685
2686 #[test]
2690 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2691 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2692 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2693 .unwrap();
2694 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2695 .unwrap();
2696
2697 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2699
2700 rt.execute_query("DROP TABLE users").unwrap();
2701
2702 let events = queue_payloads(&rt, "users_events");
2704 assert_eq!(
2705 events.len(),
2706 1,
2707 "expected 1 collection_dropped event, got {}",
2708 events.len()
2709 );
2710 let ev = events[0].as_object().expect("event is object");
2711 assert_eq!(
2712 ev.get("op").and_then(crate::json::Value::as_str),
2713 Some("collection_dropped")
2714 );
2715 assert_eq!(
2716 ev.get("collection").and_then(crate::json::Value::as_str),
2717 Some("users")
2718 );
2719 assert_eq!(
2720 ev.get("final_entities_count")
2721 .and_then(crate::json::Value::as_u64),
2722 Some(2)
2723 );
2724 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2725 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2726 assert!(ev
2727 .get("event_id")
2728 .and_then(crate::json::Value::as_str)
2729 .is_some_and(|s| !s.is_empty()));
2730
2731 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2733 assert!(
2734 format!("{err}").contains("users"),
2735 "expected not-found error"
2736 );
2737 }
2738
2739 #[test]
2742 fn drop_no_events_collection_emits_nothing() {
2743 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2744 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2745 .unwrap();
2746 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2747 .unwrap();
2748 rt.execute_query("DROP TABLE plain").unwrap();
2749 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2751 assert!(format!("{err}").contains("plain"));
2752 }
2753
2754 #[test]
2758 fn ops_filter_insert_only_ignores_update_and_delete() {
2759 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2760 rt.execute_query(
2761 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2762 )
2763 .unwrap();
2764 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2765 .unwrap();
2766 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2767 .unwrap();
2768 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2769
2770 let events = queue_payloads(&rt, "items_events");
2771 assert_eq!(
2773 events.len(),
2774 1,
2775 "expected 1 insert event, got {}",
2776 events.len()
2777 );
2778 assert_eq!(
2779 events[0]
2780 .as_object()
2781 .unwrap()
2782 .get("op")
2783 .and_then(crate::json::Value::as_str),
2784 Some("insert")
2785 );
2786 }
2787
2788 #[test]
2790 fn where_filter_skips_rows_that_do_not_match() {
2791 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2792 rt.execute_query(
2793 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2794 )
2795 .unwrap();
2796
2797 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2799 .unwrap();
2800 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2802 .unwrap();
2803
2804 let events = queue_payloads(&rt, "users_events");
2805 assert_eq!(
2806 events.len(),
2807 1,
2808 "expected 1 event (only active), got {}",
2809 events.len()
2810 );
2811 let ev = events[0].as_object().unwrap();
2812 assert_eq!(
2813 ev.get("op").and_then(crate::json::Value::as_str),
2814 Some("insert")
2815 );
2816 let after = ev.get("after").unwrap().as_object().unwrap();
2817 assert_eq!(
2818 after.get("status").and_then(crate::json::Value::as_str),
2819 Some("active")
2820 );
2821 }
2822
2823 #[test]
2825 fn ops_filter_and_where_filter_combined() {
2826 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2827 rt.execute_query(
2828 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2829 )
2830 .unwrap();
2831
2832 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2834 .unwrap();
2835 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2837 .unwrap();
2838 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2840 .unwrap();
2841 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2843
2844 let events = queue_payloads(&rt, "items_events");
2845 assert_eq!(
2847 events.len(),
2848 1,
2849 "expected 1 event, got {}: {events:?}",
2850 events.len()
2851 );
2852 assert_eq!(
2853 events[0]
2854 .as_object()
2855 .unwrap()
2856 .get("op")
2857 .and_then(crate::json::Value::as_str),
2858 Some("insert")
2859 );
2860 }
2861
2862 #[test]
2864 fn where_filter_on_delete_checks_before_state() {
2865 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866 rt.execute_query(
2867 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2868 )
2869 .unwrap();
2870
2871 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2872 .unwrap();
2873
2874 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2876 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2878
2879 let events = queue_payloads(&rt, "users_events");
2880 assert_eq!(
2881 events.len(),
2882 1,
2883 "expected 1 delete event, got {}",
2884 events.len()
2885 );
2886 let ev = events[0].as_object().unwrap();
2887 assert_eq!(
2888 ev.get("op").and_then(crate::json::Value::as_str),
2889 Some("delete")
2890 );
2891 }
2892
2893 #[test]
2897 fn alter_add_column_on_event_enabled_table_succeeds() {
2898 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2899 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2900 .unwrap();
2901 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2903 .unwrap();
2904 let contract = rt.db().collection_contract("users").unwrap();
2906 assert!(
2907 contract.declared_columns.iter().any(|c| c.name == "phone"),
2908 "phone column should be in contract"
2909 );
2910 assert!(
2912 contract.subscriptions.iter().any(|s| s.enabled),
2913 "subscription should remain enabled"
2914 );
2915 }
2916
2917 #[test]
2920 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2921 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2922 rt.execute_query(
2923 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2924 )
2925 .unwrap();
2926 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2928 .unwrap();
2929 let contract = rt.db().collection_contract("items").unwrap();
2930 assert!(
2931 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2932 "secret column should be removed"
2933 );
2934 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2936 .unwrap();
2937 assert!(
2939 contract.subscriptions.iter().any(|s| s.enabled),
2940 "subscription should remain enabled"
2941 );
2942 }
2943}