1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(&query.name, query.collection_model)
265 };
266 self.inner
267 .db
268 .save_collection_contract(contract)
269 .map_err(|err| RedDBError::Internal(err.to_string()))?;
270 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271 store.set_config_tree(
272 &format!("red.collection_tenants.{}", query.name),
273 &crate::serde_json::Value::String(tenant_id),
274 );
275 }
276 self.inner
277 .db
278 .persist_metadata()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281
282 Ok(RuntimeQueryResult::ok_message(
283 raw_query.to_string(),
284 &format!("{label} '{}' created", query.name),
285 "create",
286 ))
287 }
288
289 pub fn execute_create_collection(
290 &self,
291 raw_query: &str,
292 query: &CreateCollectionQuery,
293 ) -> RedDBResult<RuntimeQueryResult> {
294 let model = match query.kind.as_str() {
295 "graph" => CollectionModel::Graph,
296 "document" => CollectionModel::Document,
297 "metrics" => CollectionModel::Metrics,
298 "blockchain" => CollectionModel::Table,
303 other => {
304 return Err(RedDBError::Query(format!(
305 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
306 )));
307 }
308 };
309 let create = CreateTableQuery {
310 collection_model: model,
311 name: query.name.clone(),
312 columns: Vec::new(),
313 if_not_exists: query.if_not_exists,
314 default_ttl_ms: None,
315 metrics_rollup_policies: Vec::new(),
316 context_index_fields: Vec::new(),
317 context_index_enabled: false,
318 timestamps: false,
319 partition_by: None,
320 tenant_by: None,
321 append_only: false,
322 subscriptions: Vec::new(),
323 vault_own_master_key: false,
324 };
325 let result = self.execute_create_table(raw_query, &create)?;
326 if query.kind == "blockchain" {
327 self.install_blockchain_kind(&query.name)?;
328 }
329 if !query.allowed_signers.is_empty() {
334 let actor = crate::runtime::impl_core::current_user_projected()
335 .unwrap_or_else(|| "@system/create-collection".to_string());
336 crate::runtime::signed_writes_kind::install(
337 &self.inner.db.store(),
338 &query.name,
339 &query.allowed_signers,
340 &actor,
341 );
342 }
343 Ok(result)
344 }
345
346 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
350 use crate::runtime::blockchain_kind;
351 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
352 use std::sync::Arc;
353
354 let store = self.inner.db.store();
355 blockchain_kind::mark_as_chain(&store, name);
356
357 let existing_tip = blockchain_kind::chain_tip(&store, name);
358 if existing_tip.height.is_some() {
359 return Ok(());
360 }
361
362 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
363 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
364 fields.into_iter().collect();
365 let entity = UnifiedEntity::new(
366 EntityId::new(0),
367 EntityKind::TableRow {
368 table: Arc::from(name),
369 row_id: 0,
370 },
371 EntityData::Row(RowData {
372 columns: Vec::new(),
373 named: Some(named),
374 schema: None,
375 }),
376 );
377 store
378 .insert_auto(name, entity)
379 .map_err(|err| RedDBError::Internal(err.to_string()))?;
380 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
383 self.inner
384 .chain_tip_cache
385 .lock()
386 .insert(name.to_string(), tip);
387 }
388 Ok(())
389 }
390
391 pub fn execute_create_vector(
392 &self,
393 raw_query: &str,
394 query: &CreateVectorQuery,
395 ) -> RedDBResult<RuntimeQueryResult> {
396 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
397 if is_system_schema_name(&query.name) {
398 return Err(RedDBError::Query("system schema is read-only".to_string()));
399 }
400 let store = self.inner.db.store();
401 if store.get_collection(&query.name).is_some() {
402 if query.if_not_exists {
403 return Ok(RuntimeQueryResult::ok_message(
404 raw_query.to_string(),
405 &format!("vector '{}' already exists", query.name),
406 "create",
407 ));
408 }
409 return Err(RedDBError::Query(format!(
410 "vector '{}' already exists",
411 query.name
412 )));
413 }
414
415 store
416 .create_collection(&query.name)
417 .map_err(|err| RedDBError::Internal(err.to_string()))?;
418 self.inner
419 .db
420 .save_collection_contract(vector_collection_contract(query))
421 .map_err(|err| RedDBError::Internal(err.to_string()))?;
422 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
423 store.set_config_tree(
424 &format!("red.collection_tenants.{}", query.name),
425 &crate::serde_json::Value::String(tenant_id),
426 );
427 }
428 self.inner
429 .db
430 .persist_metadata()
431 .map_err(|err| RedDBError::Internal(err.to_string()))?;
432 self.invalidate_result_cache();
433
434 Ok(RuntimeQueryResult::ok_message(
435 raw_query.to_string(),
436 &format!("vector '{}' created", query.name),
437 "create",
438 ))
439 }
440
441 fn provision_vault_key_material(
442 &self,
443 collection: &str,
444 own_master_key: bool,
445 ) -> RedDBResult<()> {
446 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
447 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
448 })?;
449 if !auth_store.is_vault_backed() {
450 return Err(RedDBError::Query(
451 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
452 ));
453 }
454
455 if auth_store.vault_secret_key().is_none() {
456 let key = crate::auth::store::random_bytes(32);
457 auth_store
458 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
459 .map_err(|err| RedDBError::Query(err.to_string()))?;
460 }
461
462 if own_master_key {
463 let key = crate::auth::store::random_bytes(32);
464 auth_store
465 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
466 .map_err(|err| RedDBError::Query(err.to_string()))?;
467 }
468
469 Ok(())
470 }
471
472 pub fn execute_drop_table(
476 &self,
477 raw_query: &str,
478 query: &DropTableQuery,
479 ) -> RedDBResult<RuntimeQueryResult> {
480 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
481 let store = self.inner.db.store();
482
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486
487 let exists = store.get_collection(&query.name).is_some();
488 if !exists {
489 if query.if_exists {
490 return Ok(RuntimeQueryResult::ok_message(
491 raw_query.to_string(),
492 &format!("table '{}' does not exist", query.name),
493 "drop",
494 ));
495 }
496 return Err(RedDBError::NotFound(format!(
497 "table '{}' not found",
498 query.name
499 )));
500 }
501 let actual =
502 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
503 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
504
505 let final_count = store
508 .get_collection(&query.name)
509 .map(|manager| manager.query_all(|_| true).len() as u64)
510 .unwrap_or(0);
511 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
512 self,
513 &query.name,
514 final_count,
515 )?;
516
517 let orphaned_indices: Vec<String> = self
518 .inner
519 .index_store
520 .list_indices(&query.name)
521 .into_iter()
522 .map(|index| index.name)
523 .collect();
524 for name in &orphaned_indices {
525 self.inner.index_store.drop_index(name, &query.name);
526 }
527
528 store
529 .drop_collection(&query.name)
530 .map_err(|err| RedDBError::Internal(err.to_string()))?;
531 self.inner.db.invalidate_vector_index(&query.name);
532 self.inner.db.clear_collection_default_ttl_ms(&query.name);
533 self.inner
534 .db
535 .remove_collection_contract(&query.name)
536 .map_err(|err| RedDBError::Internal(err.to_string()))?;
537 self.clear_table_planner_stats(&query.name);
538 self.invalidate_result_cache();
539 if let Some(store) = self.inner.auth_store.read().clone() {
543 store.invalidate_visible_collections_cache();
544 }
545 self.inner
546 .db
547 .persist_metadata()
548 .map_err(|err| RedDBError::Internal(err.to_string()))?;
549 self.schema_vocabulary_apply(
555 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
556 collection: query.name.clone(),
557 },
558 );
559
560 Ok(RuntimeQueryResult::ok_message(
561 raw_query.to_string(),
562 &format!("table '{}' dropped", query.name),
563 "drop",
564 ))
565 }
566
567 pub fn execute_drop_graph(
568 &self,
569 raw_query: &str,
570 query: &DropGraphQuery,
571 ) -> RedDBResult<RuntimeQueryResult> {
572 self.execute_drop_typed_collection(
573 raw_query,
574 &query.name,
575 query.if_exists,
576 CollectionModel::Graph,
577 "graph",
578 )
579 }
580
581 pub fn execute_drop_vector(
582 &self,
583 raw_query: &str,
584 query: &DropVectorQuery,
585 ) -> RedDBResult<RuntimeQueryResult> {
586 self.execute_drop_typed_collection(
587 raw_query,
588 &query.name,
589 query.if_exists,
590 CollectionModel::Vector,
591 "vector",
592 )
593 }
594
595 pub fn execute_drop_document(
596 &self,
597 raw_query: &str,
598 query: &DropDocumentQuery,
599 ) -> RedDBResult<RuntimeQueryResult> {
600 self.execute_drop_typed_collection(
601 raw_query,
602 &query.name,
603 query.if_exists,
604 CollectionModel::Document,
605 "document",
606 )
607 }
608
609 pub fn execute_drop_kv(
610 &self,
611 raw_query: &str,
612 query: &DropKvQuery,
613 ) -> RedDBResult<RuntimeQueryResult> {
614 let label = polymorphic_resolver::model_name(query.model);
615 self.execute_drop_typed_collection(
616 raw_query,
617 &query.name,
618 query.if_exists,
619 query.model,
620 label,
621 )
622 }
623
624 pub fn execute_drop_collection(
625 &self,
626 raw_query: &str,
627 query: &DropCollectionQuery,
628 ) -> RedDBResult<RuntimeQueryResult> {
629 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
630 if is_system_schema_name(&query.name) {
631 return Err(RedDBError::Query("system schema is read-only".to_string()));
632 }
633 let store = self.inner.db.store();
634 if store.get_collection(&query.name).is_none() {
635 if query.if_exists {
636 return Ok(RuntimeQueryResult::ok_message(
637 raw_query.to_string(),
638 &format!("collection '{}' does not exist", query.name),
639 "drop",
640 ));
641 }
642 return Err(RedDBError::NotFound(format!(
643 "collection '{}' not found",
644 query.name
645 )));
646 }
647
648 let actual =
649 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
650 if let Some(expected) = query.model {
651 polymorphic_resolver::ensure_model_match(expected, actual)?;
652 }
653
654 match actual {
655 CollectionModel::Table => self.execute_drop_table(
656 raw_query,
657 &DropTableQuery {
658 name: query.name.clone(),
659 if_exists: query.if_exists,
660 },
661 ),
662 CollectionModel::TimeSeries => self.execute_drop_timeseries(
663 raw_query,
664 &DropTimeSeriesQuery {
665 name: query.name.clone(),
666 if_exists: query.if_exists,
667 },
668 ),
669 CollectionModel::Queue => self.execute_drop_queue(
670 raw_query,
671 &DropQueueQuery {
672 name: query.name.clone(),
673 if_exists: query.if_exists,
674 },
675 ),
676 CollectionModel::Graph => self.execute_drop_graph(
677 raw_query,
678 &DropGraphQuery {
679 name: query.name.clone(),
680 if_exists: query.if_exists,
681 },
682 ),
683 CollectionModel::Vector => self.execute_drop_vector(
684 raw_query,
685 &DropVectorQuery {
686 name: query.name.clone(),
687 if_exists: query.if_exists,
688 },
689 ),
690 CollectionModel::Document => self.execute_drop_document(
691 raw_query,
692 &DropDocumentQuery {
693 name: query.name.clone(),
694 if_exists: query.if_exists,
695 },
696 ),
697 CollectionModel::Kv => self.execute_drop_kv(
698 raw_query,
699 &DropKvQuery {
700 name: query.name.clone(),
701 if_exists: query.if_exists,
702 model: CollectionModel::Kv,
703 },
704 ),
705 CollectionModel::Config => self.execute_drop_kv(
706 raw_query,
707 &DropKvQuery {
708 name: query.name.clone(),
709 if_exists: query.if_exists,
710 model: CollectionModel::Config,
711 },
712 ),
713 CollectionModel::Vault => self.execute_drop_kv(
714 raw_query,
715 &DropKvQuery {
716 name: query.name.clone(),
717 if_exists: query.if_exists,
718 model: CollectionModel::Vault,
719 },
720 ),
721 CollectionModel::Hll => self.execute_probabilistic_command(
722 raw_query,
723 &ProbabilisticCommand::DropHll {
724 name: query.name.clone(),
725 if_exists: query.if_exists,
726 },
727 ),
728 CollectionModel::Sketch => self.execute_probabilistic_command(
729 raw_query,
730 &ProbabilisticCommand::DropSketch {
731 name: query.name.clone(),
732 if_exists: query.if_exists,
733 },
734 ),
735 CollectionModel::Filter => self.execute_probabilistic_command(
736 raw_query,
737 &ProbabilisticCommand::DropFilter {
738 name: query.name.clone(),
739 if_exists: query.if_exists,
740 },
741 ),
742 CollectionModel::Metrics => self.execute_drop_typed_collection(
743 raw_query,
744 &query.name,
745 query.if_exists,
746 CollectionModel::Metrics,
747 "metrics",
748 ),
749 CollectionModel::Mixed => self.execute_drop_typed_collection(
750 raw_query,
751 &query.name,
752 query.if_exists,
753 CollectionModel::Mixed,
754 "collection",
755 ),
756 }
757 }
758
759 pub fn execute_alter_table(
765 &self,
766 raw_query: &str,
767 query: &AlterTableQuery,
768 ) -> RedDBResult<RuntimeQueryResult> {
769 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
770 let store = self.inner.db.store();
771
772 if store.get_collection(&query.name).is_none() {
774 return Err(RedDBError::NotFound(format!(
775 "table '{}' not found",
776 query.name
777 )));
778 }
779
780 let mut messages = Vec::new();
781
782 let fields_added: Vec<String> = query
784 .operations
785 .iter()
786 .filter_map(|op| {
787 if let AlterOperation::AddColumn(col) = op {
788 Some(col.name.clone())
789 } else {
790 None
791 }
792 })
793 .collect();
794 let fields_removed: Vec<String> = query
795 .operations
796 .iter()
797 .filter_map(|op| {
798 if let AlterOperation::DropColumn(name) = op {
799 Some(name.clone())
800 } else {
801 None
802 }
803 })
804 .collect();
805
806 for op in &query.operations {
807 match op {
808 AlterOperation::AddColumn(col) => {
809 messages.push(format!("column '{}' added", col.name));
811 }
812 AlterOperation::DropColumn(name) => {
813 messages.push(format!("column '{}' dropped", name));
814 }
815 AlterOperation::RenameColumn { from, to } => {
816 messages.push(format!("column '{}' renamed to '{}'", from, to));
817 }
818 AlterOperation::AttachPartition { child, bound } => {
819 store.set_config_tree(
823 &format!("partition.{}.children.{}", query.name, child),
824 &crate::serde_json::Value::String(bound.clone()),
825 );
826 messages.push(format!(
827 "partition '{child}' attached to '{}' ({bound})",
828 query.name
829 ));
830 }
831 AlterOperation::DetachPartition { child } => {
832 store.set_config_tree(
833 &format!("partition.{}.children.{}", query.name, child),
834 &crate::serde_json::Value::Null,
835 );
836 messages.push(format!(
837 "partition '{child}' detached from '{}'",
838 query.name
839 ));
840 }
841 AlterOperation::EnableRowLevelSecurity => {
842 self.inner
843 .rls_enabled_tables
844 .write()
845 .insert(query.name.clone());
846 store.set_config_tree(
848 &format!("rls.enabled.{}", query.name),
849 &crate::serde_json::Value::Bool(true),
850 );
851 self.invalidate_plan_cache();
852 messages.push(format!("row level security enabled on '{}'", query.name));
853 }
854 AlterOperation::DisableRowLevelSecurity => {
855 self.inner.rls_enabled_tables.write().remove(&query.name);
856 store.set_config_tree(
857 &format!("rls.enabled.{}", query.name),
858 &crate::serde_json::Value::Null,
859 );
860 self.invalidate_plan_cache();
861 messages.push(format!("row level security disabled on '{}'", query.name));
862 }
863 AlterOperation::EnableTenancy { column } => {
865 store.set_config_tree(
866 &format!("tenant_tables.{}.column", query.name),
867 &crate::serde_json::Value::String(column.clone()),
868 );
869 self.register_tenant_table(&query.name, column);
870 self.invalidate_plan_cache();
871 messages.push(format!(
872 "tenancy enabled on '{}' by column '{column}'",
873 query.name
874 ));
875 }
876 AlterOperation::DisableTenancy => {
877 store.set_config_tree(
878 &format!("tenant_tables.{}.column", query.name),
879 &crate::serde_json::Value::Null,
880 );
881 self.unregister_tenant_table(&query.name);
882 self.invalidate_plan_cache();
883 messages.push(format!("tenancy disabled on '{}'", query.name));
884 }
885 AlterOperation::SetAppendOnly(on) => {
886 messages.push(format!(
891 "append_only {} on '{}'",
892 if *on { "enabled" } else { "disabled" },
893 query.name
894 ));
895 }
896 AlterOperation::SetVersioned(on) => {
897 self.vcs_set_versioned(&query.name, *on)?;
904 messages.push(format!(
905 "versioned {} on '{}'",
906 if *on { "enabled" } else { "disabled" },
907 query.name
908 ));
909 }
910 AlterOperation::EnableEvents(subscription) => {
911 let mut subscription = subscription.clone();
912 subscription.source = query.name.clone();
913 validate_event_subscriptions(
914 self,
915 &query.name,
916 std::slice::from_ref(&subscription),
917 )?;
918 ensure_event_target_queue(self, &subscription.target_queue)?;
919 messages.push(format!(
920 "events enabled on '{}' to '{}'",
921 query.name, subscription.target_queue
922 ));
923 }
924 AlterOperation::DisableEvents => {
925 messages.push(format!("events disabled on '{}'", query.name));
926 }
927 AlterOperation::AddSubscription { name, descriptor } => {
928 let mut sub = descriptor.clone();
929 sub.name = name.clone();
930 sub.source = query.name.clone();
931 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
932 ensure_event_target_queue(self, &sub.target_queue)?;
933 messages.push(format!(
934 "subscription '{}' added on '{}' to '{}'",
935 name, query.name, sub.target_queue
936 ));
937 }
938 AlterOperation::DropSubscription { name } => {
939 messages.push(format!(
940 "subscription '{}' dropped on '{}'",
941 name, query.name
942 ));
943 }
944 AlterOperation::AddSigner { pubkey } => {
945 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
952 return Err(RedDBError::Query(format!(
953 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
954 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
955 query.name
956 )));
957 }
958 let actor = crate::runtime::impl_core::current_user_projected()
959 .unwrap_or_else(|| "@system/alter".to_string());
960 let changed = crate::runtime::signed_writes_kind::add_signer(
961 &store,
962 &query.name,
963 *pubkey,
964 &actor,
965 );
966 messages.push(format!(
967 "signer {} on '{}'",
968 if changed { "added" } else { "already present" },
969 query.name
970 ));
971 }
972 AlterOperation::RevokeSigner { pubkey } => {
973 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
974 return Err(RedDBError::Query(format!(
975 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
976 query.name
977 )));
978 }
979 let actor = crate::runtime::impl_core::current_user_projected()
980 .unwrap_or_else(|| "@system/alter".to_string());
981 let changed = crate::runtime::signed_writes_kind::revoke_signer(
982 &store,
983 &query.name,
984 pubkey,
985 &actor,
986 );
987 messages.push(format!(
988 "signer {} on '{}'",
989 if changed {
990 "revoked"
991 } else {
992 "already revoked"
993 },
994 query.name
995 ));
996 }
997 AlterOperation::SetRetention { duration_ms } => {
998 let existing = self.inner.db.collection_contract(&query.name);
1004 let has_ts_column = existing
1005 .as_ref()
1006 .map(retention_timestamp_column_exists)
1007 .unwrap_or(false);
1008 if !has_ts_column {
1009 return Err(RedDBError::Query(format!(
1010 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1011 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1012 or enable WITH timestamps = true before setting a \
1013 retention policy",
1014 query.name
1015 )));
1016 }
1017 messages.push(format!(
1018 "retention set to {duration_ms} ms on '{}'",
1019 query.name
1020 ));
1021 }
1022 AlterOperation::UnsetRetention => {
1023 messages.push(format!("retention cleared on '{}'", query.name));
1024 }
1025 }
1026 }
1027
1028 let mut contract = self
1029 .inner
1030 .db
1031 .collection_contract(&query.name)
1032 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1033 apply_alter_operations_to_contract(&mut contract, &query.operations);
1034 contract.version = contract.version.saturating_add(1);
1035 contract.updated_at_unix_ms = current_unix_ms();
1036 self.inner
1037 .db
1038 .save_collection_contract(contract)
1039 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1040 if !fields_added.is_empty() || !fields_removed.is_empty() {
1044 let sub_names: Vec<String> = self
1045 .inner
1046 .db
1047 .collection_contract(&query.name)
1048 .map(|c| {
1049 c.subscriptions
1050 .iter()
1051 .filter(|s| s.enabled)
1052 .map(|s| s.name.clone())
1053 .collect()
1054 })
1055 .unwrap_or_default();
1056 if !sub_names.is_empty() {
1057 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1058 collection: query.name.clone(),
1059 subscription_names: sub_names.join(", "),
1060 fields_added: fields_added.join(", "),
1061 fields_removed: fields_removed.join(", "),
1062 lsn: self.cdc_current_lsn(),
1063 }
1064 .emit_global();
1065 }
1066 }
1067
1068 self.clear_table_planner_stats(&query.name);
1069 self.invalidate_result_cache();
1070 let post_alter_columns: Vec<String> = self
1075 .inner
1076 .db
1077 .collection_contract(&query.name)
1078 .map(|contract| {
1079 contract
1080 .declared_columns
1081 .iter()
1082 .map(|col| col.name.clone())
1083 .collect()
1084 })
1085 .unwrap_or_default();
1086 self.schema_vocabulary_apply(
1087 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1088 collection: query.name.clone(),
1089 columns: post_alter_columns,
1090 type_tags: Vec::new(),
1091 description: None,
1092 },
1093 );
1094
1095 let message = if messages.is_empty() {
1096 format!("table '{}' altered (no operations)", query.name)
1097 } else {
1098 format!("table '{}' altered: {}", query.name, messages.join(", "))
1099 };
1100
1101 Ok(RuntimeQueryResult::ok_message(
1102 raw_query.to_string(),
1103 &message,
1104 "alter",
1105 ))
1106 }
1107
1108 pub fn execute_explain_alter(
1115 &self,
1116 raw_query: &str,
1117 query: &ExplainAlterQuery,
1118 ) -> RedDBResult<RuntimeQueryResult> {
1119 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1123
1124 let current_contract = self.inner.db.collection_contract(&query.target.name);
1125
1126 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1127 .as_ref()
1128 .map(|c| c.declared_columns.clone())
1129 .unwrap_or_default();
1130
1131 let diff = super::schema_diff::compute_column_diff(
1132 &query.target.name,
1133 ¤t_columns,
1134 &query.target.columns,
1135 );
1136
1137 let rendered = match query.format {
1138 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1139 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1140 };
1141
1142 let format_label = match query.format {
1143 ExplainFormat::Sql => "sql",
1144 ExplainFormat::Json => "json",
1145 };
1146
1147 let columns = vec![
1148 "table".to_string(),
1149 "format".to_string(),
1150 "diff".to_string(),
1151 ];
1152 let row = vec![
1153 ("table".to_string(), Value::text(query.target.name.clone())),
1154 ("format".to_string(), Value::text(format_label.to_string())),
1155 ("diff".to_string(), Value::text(rendered)),
1156 ];
1157
1158 Ok(RuntimeQueryResult::ok_records(
1159 raw_query.to_string(),
1160 columns,
1161 vec![row],
1162 "explain",
1163 ))
1164 }
1165
1166 pub fn execute_create_index(
1171 &self,
1172 raw_query: &str,
1173 query: &CreateIndexQuery,
1174 ) -> RedDBResult<RuntimeQueryResult> {
1175 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1176 let store = self.inner.db.store();
1177
1178 let manager = store
1180 .get_collection(&query.table)
1181 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1182
1183 let method_kind = match query.method {
1184 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1185 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1186 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1187 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1188 };
1189
1190 let entities = manager.query_all(|_| true);
1200 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1201 entities
1202 .iter()
1203 .map(|e| {
1204 let fields = match &e.data {
1205 crate::storage::EntityData::Row(row) => {
1206 if let Some(ref named) = row.named {
1207 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1208 } else if let Some(ref schema) = row.schema {
1209 schema
1213 .iter()
1214 .zip(row.columns.iter())
1215 .map(|(k, v)| (k.clone(), v.clone()))
1216 .collect()
1217 } else {
1218 Vec::new()
1219 }
1220 }
1221 crate::storage::EntityData::Node(node) => node
1222 .properties
1223 .iter()
1224 .map(|(k, v)| (k.clone(), v.clone()))
1225 .collect(),
1226 _ => Vec::new(),
1227 };
1228 (e.id, fields)
1229 })
1230 .collect();
1231
1232 let indexed_count = self
1234 .inner
1235 .index_store
1236 .create_index(
1237 &query.name,
1238 &query.table,
1239 &query.columns,
1240 method_kind,
1241 query.unique,
1242 &entity_fields,
1243 )
1244 .map_err(RedDBError::Internal)?;
1245
1246 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1247 &query.table,
1248 &entity_fields,
1249 );
1250 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1251 self.invalidate_plan_cache();
1252
1253 self.inner
1255 .index_store
1256 .register(super::index_store::RegisteredIndex {
1257 name: query.name.clone(),
1258 collection: query.table.clone(),
1259 columns: query.columns.clone(),
1260 method: method_kind,
1261 unique: query.unique,
1262 });
1263 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1267 collection: query.table.clone(),
1268 index: query.name.clone(),
1269 columns: query.columns.clone(),
1270 });
1271
1272 let method_str = format!("{}", query.method);
1273 let unique_str = if query.unique { "unique " } else { "" };
1274 let cols = query.columns.join(", ");
1275
1276 Ok(RuntimeQueryResult::ok_message(
1277 raw_query.to_string(),
1278 &format!(
1279 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1280 unique_str, query.name, query.table, cols, method_str, indexed_count
1281 ),
1282 "create",
1283 ))
1284 }
1285
1286 pub fn execute_drop_index(
1290 &self,
1291 raw_query: &str,
1292 query: &DropIndexQuery,
1293 ) -> RedDBResult<RuntimeQueryResult> {
1294 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1295 let store = self.inner.db.store();
1296
1297 if store.get_collection(&query.table).is_none() {
1299 if query.if_exists {
1300 return Ok(RuntimeQueryResult::ok_message(
1301 raw_query.to_string(),
1302 &format!("table '{}' does not exist", query.table),
1303 "drop",
1304 ));
1305 }
1306 return Err(RedDBError::NotFound(format!(
1307 "table '{}' not found",
1308 query.table
1309 )));
1310 }
1311
1312 self.inner.index_store.drop_index(&query.name, &query.table);
1314 self.invalidate_plan_cache();
1315 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1317 collection: query.table.clone(),
1318 index: query.name.clone(),
1319 });
1320
1321 Ok(RuntimeQueryResult::ok_message(
1322 raw_query.to_string(),
1323 &format!("index '{}' dropped from '{}'", query.name, query.table),
1324 "drop",
1325 ))
1326 }
1327
1328 fn execute_drop_typed_collection(
1329 &self,
1330 raw_query: &str,
1331 name: &str,
1332 if_exists: bool,
1333 expected_model: CollectionModel,
1334 label: &str,
1335 ) -> RedDBResult<RuntimeQueryResult> {
1336 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1337 if is_system_schema_name(name) {
1338 return Err(RedDBError::Query("system schema is read-only".to_string()));
1339 }
1340 let store = self.inner.db.store();
1341 if store.get_collection(name).is_none() {
1342 if if_exists {
1343 return Ok(RuntimeQueryResult::ok_message(
1344 raw_query.to_string(),
1345 &format!("{label} '{name}' does not exist"),
1346 "drop",
1347 ));
1348 }
1349 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1350 }
1351
1352 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1353 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1354 self.drop_collection_storage(raw_query, name, label)
1355 }
1356
1357 pub fn execute_truncate(
1358 &self,
1359 raw_query: &str,
1360 query: &TruncateQuery,
1361 ) -> RedDBResult<RuntimeQueryResult> {
1362 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1363 if is_system_schema_name(&query.name) {
1364 return Err(RedDBError::Query("system schema is read-only".to_string()));
1365 }
1366
1367 let label = query
1368 .model
1369 .map(polymorphic_resolver::model_name)
1370 .unwrap_or("collection");
1371 let store = self.inner.db.store();
1372 if store.get_collection(&query.name).is_none() {
1373 if query.if_exists {
1374 return Ok(RuntimeQueryResult::ok_message(
1375 raw_query.to_string(),
1376 &format!("{label} '{}' does not exist", query.name),
1377 "truncate",
1378 ));
1379 }
1380 return Err(RedDBError::NotFound(format!(
1381 "{label} '{}' not found",
1382 query.name
1383 )));
1384 }
1385
1386 let actual =
1387 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1388 if let Some(expected) = query.model {
1389 polymorphic_resolver::ensure_model_match(expected, actual)?;
1390 }
1391
1392 if actual == CollectionModel::Queue {
1393 return self.execute_queue_command(
1394 raw_query,
1395 &QueueCommand::Purge {
1396 queue: query.name.clone(),
1397 },
1398 );
1399 }
1400
1401 let affected = self.truncate_collection_entities(&query.name)?;
1403 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1405 self.inner.db.invalidate_vector_index(&query.name);
1406 self.clear_table_planner_stats(&query.name);
1407 self.invalidate_result_cache();
1408
1409 Ok(RuntimeQueryResult::ok_message(
1410 raw_query.to_string(),
1411 &format!(
1412 "{affected} entities truncated from {label} '{}'",
1413 query.name
1414 ),
1415 "truncate",
1416 ))
1417 }
1418
1419 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1420 let store = self.inner.db.store();
1421 let Some(manager) = store.get_collection(name) else {
1422 return Ok(0);
1423 };
1424 let entities = manager.query_all(|_| true);
1425 if entities.is_empty() {
1426 return Ok(0);
1427 }
1428
1429 for entity in &entities {
1430 let fields = entity_index_fields(&entity.data);
1431 self.inner
1432 .index_store
1433 .index_entity_delete(name, entity.id, &fields)
1434 .map_err(RedDBError::Internal)?;
1435 }
1436
1437 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1438 let deleted_ids = store
1439 .delete_batch(name, &ids)
1440 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1441 for id in &deleted_ids {
1442 store.context_index().remove_entity(*id);
1443 }
1444 Ok(deleted_ids.len() as u64)
1445 }
1446
1447 fn drop_collection_storage(
1448 &self,
1449 raw_query: &str,
1450 name: &str,
1451 label: &str,
1452 ) -> RedDBResult<RuntimeQueryResult> {
1453 let store = self.inner.db.store();
1454
1455 let final_count = store
1458 .get_collection(name)
1459 .map(|manager| manager.query_all(|_| true).len() as u64)
1460 .unwrap_or(0);
1461 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1462 self,
1463 name,
1464 final_count,
1465 )?;
1466
1467 let orphaned_indices: Vec<String> = self
1468 .inner
1469 .index_store
1470 .list_indices(name)
1471 .into_iter()
1472 .map(|index| index.name)
1473 .collect();
1474 for index_name in &orphaned_indices {
1475 self.inner.index_store.drop_index(index_name, name);
1476 }
1477
1478 store
1479 .drop_collection(name)
1480 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1481 self.inner.db.invalidate_vector_index(name);
1482 self.inner.db.clear_collection_default_ttl_ms(name);
1483 self.inner
1484 .db
1485 .remove_collection_contract(name)
1486 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1487 self.clear_table_planner_stats(name);
1488 self.invalidate_result_cache();
1489 if let Some(store) = self.inner.auth_store.read().clone() {
1490 store.invalidate_visible_collections_cache();
1491 }
1492 self.inner
1493 .db
1494 .persist_metadata()
1495 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1496 self.schema_vocabulary_apply(
1497 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1498 collection: name.to_string(),
1499 },
1500 );
1501
1502 Ok(RuntimeQueryResult::ok_message(
1503 raw_query.to_string(),
1504 &format!("{label} '{name}' dropped"),
1505 "drop",
1506 ))
1507 }
1508}
1509
1510pub(crate) fn is_system_schema_name(name: &str) -> bool {
1511 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1512}
1513
1514fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1515 match data {
1516 EntityData::Row(row) => {
1517 if let Some(ref named) = row.named {
1518 named
1519 .iter()
1520 .map(|(key, value)| (key.clone(), value.clone()))
1521 .collect()
1522 } else if let Some(ref schema) = row.schema {
1523 schema
1524 .iter()
1525 .zip(row.columns.iter())
1526 .map(|(key, value)| (key.clone(), value.clone()))
1527 .collect()
1528 } else {
1529 Vec::new()
1530 }
1531 }
1532 EntityData::Node(node) => node
1533 .properties
1534 .iter()
1535 .map(|(key, value)| (key.clone(), value.clone()))
1536 .collect(),
1537 _ => Vec::new(),
1538 }
1539}
1540
1541fn collection_contract_from_create_table(
1542 query: &CreateTableQuery,
1543) -> RedDBResult<crate::physical::CollectionContract> {
1544 let now = current_unix_ms();
1545 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1546 .columns
1547 .iter()
1548 .map(declared_column_contract_from_ddl)
1549 .collect();
1550 if query.timestamps {
1551 declared_columns.push(crate::physical::DeclaredColumnContract {
1555 name: "created_at".to_string(),
1556 data_type: "BIGINT".to_string(),
1557 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1558 not_null: true,
1559 default: None,
1560 compress: None,
1561 unique: false,
1562 primary_key: false,
1563 enum_variants: Vec::new(),
1564 array_element: None,
1565 decimal_precision: None,
1566 });
1567 declared_columns.push(crate::physical::DeclaredColumnContract {
1568 name: "updated_at".to_string(),
1569 data_type: "BIGINT".to_string(),
1570 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1571 not_null: true,
1572 default: None,
1573 compress: None,
1574 unique: false,
1575 primary_key: false,
1576 enum_variants: Vec::new(),
1577 array_element: None,
1578 decimal_precision: None,
1579 });
1580 }
1581 Ok(crate::physical::CollectionContract {
1582 name: query.name.clone(),
1583 declared_model: crate::catalog::CollectionModel::Table,
1584 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1585 origin: crate::physical::ContractOrigin::Explicit,
1586 version: 1,
1587 created_at_unix_ms: now,
1588 updated_at_unix_ms: now,
1589 default_ttl_ms: query.default_ttl_ms,
1590 vector_dimension: None,
1591 vector_metric: None,
1592 context_index_fields: query.context_index_fields.clone(),
1593 declared_columns,
1594 table_def: Some(build_table_def_from_create_table(query)?),
1595 timestamps_enabled: query.timestamps,
1596 context_index_enabled: query.context_index_enabled
1597 || !query.context_index_fields.is_empty(),
1598 metrics_raw_retention_ms: None,
1599 metrics_rollup_policies: Vec::new(),
1600 metrics_tenant_identity: None,
1601 metrics_namespace: None,
1602 append_only: query.append_only,
1603 subscriptions: query.subscriptions.clone(),
1604 session_key: None,
1605 session_gap_ms: None,
1606 retention_duration_ms: None,
1607 })
1608}
1609
1610fn default_collection_contract_for_existing_table(
1611 name: &str,
1612) -> crate::physical::CollectionContract {
1613 let now = current_unix_ms();
1614 crate::physical::CollectionContract {
1615 name: name.to_string(),
1616 declared_model: crate::catalog::CollectionModel::Table,
1617 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1618 origin: crate::physical::ContractOrigin::Explicit,
1619 version: 0,
1620 created_at_unix_ms: now,
1621 updated_at_unix_ms: now,
1622 default_ttl_ms: None,
1623 vector_dimension: None,
1624 vector_metric: None,
1625 context_index_fields: Vec::new(),
1626 declared_columns: Vec::new(),
1627 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1628 timestamps_enabled: false,
1629 context_index_enabled: false,
1630 metrics_raw_retention_ms: None,
1631 metrics_rollup_policies: Vec::new(),
1632 metrics_tenant_identity: None,
1633 metrics_namespace: None,
1634 append_only: false,
1635 subscriptions: Vec::new(),
1636 session_key: None,
1637 session_gap_ms: None,
1638 retention_duration_ms: None,
1639 }
1640}
1641
1642fn keyed_collection_contract(
1643 name: &str,
1644 model: crate::catalog::CollectionModel,
1645) -> crate::physical::CollectionContract {
1646 let now = current_unix_ms();
1647 crate::physical::CollectionContract {
1648 name: name.to_string(),
1649 declared_model: model,
1650 schema_mode: crate::catalog::SchemaMode::Dynamic,
1651 origin: crate::physical::ContractOrigin::Explicit,
1652 version: 1,
1653 created_at_unix_ms: now,
1654 updated_at_unix_ms: now,
1655 default_ttl_ms: None,
1656 vector_dimension: None,
1657 vector_metric: None,
1658 context_index_fields: Vec::new(),
1659 declared_columns: Vec::new(),
1660 table_def: None,
1661 timestamps_enabled: false,
1662 context_index_enabled: false,
1663 metrics_raw_retention_ms: None,
1664 metrics_rollup_policies: Vec::new(),
1665 metrics_tenant_identity: None,
1666 metrics_namespace: None,
1667 append_only: false,
1668 subscriptions: Vec::new(),
1669 session_key: None,
1670 session_gap_ms: None,
1671 retention_duration_ms: None,
1672 }
1673}
1674
1675fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1676 let now = current_unix_ms();
1677 crate::physical::CollectionContract {
1678 name: query.name.clone(),
1679 declared_model: crate::catalog::CollectionModel::Metrics,
1680 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1681 origin: crate::physical::ContractOrigin::Explicit,
1682 version: 1,
1683 created_at_unix_ms: now,
1684 updated_at_unix_ms: now,
1685 default_ttl_ms: query.default_ttl_ms,
1686 vector_dimension: None,
1687 vector_metric: None,
1688 context_index_fields: Vec::new(),
1689 declared_columns: Vec::new(),
1690 table_def: None,
1691 timestamps_enabled: false,
1692 context_index_enabled: false,
1693 metrics_raw_retention_ms: query.default_ttl_ms,
1694 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1695 metrics_tenant_identity: Some(
1696 query
1697 .tenant_by
1698 .clone()
1699 .unwrap_or_else(|| "current_tenant".to_string()),
1700 ),
1701 metrics_namespace: Some("default".to_string()),
1702 append_only: true,
1703 subscriptions: Vec::new(),
1704 session_key: None,
1705 session_gap_ms: None,
1706 retention_duration_ms: None,
1707 }
1708}
1709
1710fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1711 let now = current_unix_ms();
1712 crate::physical::CollectionContract {
1713 name: query.name.clone(),
1714 declared_model: crate::catalog::CollectionModel::Vector,
1715 schema_mode: crate::catalog::SchemaMode::Dynamic,
1716 origin: crate::physical::ContractOrigin::Explicit,
1717 version: 1,
1718 created_at_unix_ms: now,
1719 updated_at_unix_ms: now,
1720 default_ttl_ms: None,
1721 vector_dimension: Some(query.dimension),
1722 vector_metric: Some(query.metric),
1723 context_index_fields: Vec::new(),
1724 declared_columns: Vec::new(),
1725 table_def: None,
1726 timestamps_enabled: false,
1727 context_index_enabled: false,
1728 metrics_raw_retention_ms: None,
1729 metrics_rollup_policies: Vec::new(),
1730 metrics_tenant_identity: None,
1731 metrics_namespace: None,
1732 append_only: false,
1733 subscriptions: Vec::new(),
1734 session_key: None,
1735 session_gap_ms: None,
1736 retention_duration_ms: None,
1737 }
1738}
1739
1740fn declared_column_contract_from_ddl(
1741 column: &CreateColumnDef,
1742) -> crate::physical::DeclaredColumnContract {
1743 crate::physical::DeclaredColumnContract {
1744 name: column.name.clone(),
1745 data_type: column.data_type.clone(),
1746 sql_type: Some(column.sql_type.clone()),
1747 not_null: column.not_null,
1748 default: column.default.clone(),
1749 compress: column.compress,
1750 unique: column.unique,
1751 primary_key: column.primary_key,
1752 enum_variants: column.enum_variants.clone(),
1753 array_element: column.array_element.clone(),
1754 decimal_precision: column.decimal_precision,
1755 }
1756}
1757
1758fn apply_alter_operations_to_contract(
1759 contract: &mut crate::physical::CollectionContract,
1760 operations: &[AlterOperation],
1761) {
1762 if contract.table_def.is_none() {
1763 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1764 }
1765 for operation in operations {
1766 match operation {
1767 AlterOperation::AddColumn(column) => {
1768 if !contract
1769 .declared_columns
1770 .iter()
1771 .any(|existing| existing.name == column.name)
1772 {
1773 contract
1774 .declared_columns
1775 .push(declared_column_contract_from_ddl(column));
1776 }
1777 if let Some(table_def) = contract.table_def.as_mut() {
1778 if table_def.get_column(&column.name).is_none() {
1779 if let Ok(column_def) = column_def_from_ddl(column) {
1780 if column.primary_key {
1781 table_def.primary_key.push(column.name.clone());
1782 table_def.constraints.push(
1783 crate::storage::schema::Constraint::new(
1784 format!("pk_{}", column.name),
1785 crate::storage::schema::ConstraintType::PrimaryKey,
1786 )
1787 .on_columns(vec![column.name.clone()]),
1788 );
1789 }
1790 if column.unique {
1791 table_def.constraints.push(
1792 crate::storage::schema::Constraint::new(
1793 format!("uniq_{}", column.name),
1794 crate::storage::schema::ConstraintType::Unique,
1795 )
1796 .on_columns(vec![column.name.clone()]),
1797 );
1798 }
1799 if column.not_null {
1800 table_def.constraints.push(
1801 crate::storage::schema::Constraint::new(
1802 format!("not_null_{}", column.name),
1803 crate::storage::schema::ConstraintType::NotNull,
1804 )
1805 .on_columns(vec![column.name.clone()]),
1806 );
1807 }
1808 table_def.columns.push(column_def);
1809 }
1810 }
1811 }
1812 }
1813 AlterOperation::DropColumn(name) => {
1814 contract
1815 .declared_columns
1816 .retain(|column| column.name != *name);
1817 if let Some(table_def) = contract.table_def.as_mut() {
1818 if let Some(index) = table_def.column_index(name) {
1819 table_def.columns.remove(index);
1820 }
1821 table_def.primary_key.retain(|column| column != name);
1822 table_def.constraints.retain(|constraint| {
1823 !constraint.columns.iter().any(|column| column == name)
1824 });
1825 table_def
1826 .indexes
1827 .retain(|index| !index.columns.iter().any(|column| column == name));
1828 }
1829 }
1830 AlterOperation::RenameColumn { from, to } => {
1831 if contract
1832 .declared_columns
1833 .iter()
1834 .any(|column| column.name == *to)
1835 {
1836 continue;
1837 }
1838 if let Some(column) = contract
1839 .declared_columns
1840 .iter_mut()
1841 .find(|column| column.name == *from)
1842 {
1843 column.name = to.clone();
1844 }
1845 if let Some(table_def) = contract.table_def.as_mut() {
1846 if let Some(column) = table_def
1847 .columns
1848 .iter_mut()
1849 .find(|column| column.name == *from)
1850 {
1851 column.name = to.clone();
1852 }
1853 for primary_key in &mut table_def.primary_key {
1854 if *primary_key == *from {
1855 *primary_key = to.clone();
1856 }
1857 }
1858 for constraint in &mut table_def.constraints {
1859 for column in &mut constraint.columns {
1860 if *column == *from {
1861 *column = to.clone();
1862 }
1863 }
1864 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1865 for column in ref_columns {
1866 if *column == *from {
1867 *column = to.clone();
1868 }
1869 }
1870 }
1871 }
1872 for index in &mut table_def.indexes {
1873 for column in &mut index.columns {
1874 if *column == *from {
1875 *column = to.clone();
1876 }
1877 }
1878 }
1879 }
1880 }
1881 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1884 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1888 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1891 AlterOperation::SetAppendOnly(on) => {
1892 contract.append_only = *on;
1893 }
1894 AlterOperation::SetVersioned(_) => {}
1897 AlterOperation::EnableEvents(subscription) => {
1898 let mut subscription = subscription.clone();
1899 subscription.source = contract.name.clone();
1900 subscription.enabled = true;
1901 if let Some(existing) = contract
1902 .subscriptions
1903 .iter_mut()
1904 .find(|existing| existing.target_queue == subscription.target_queue)
1905 {
1906 *existing = subscription;
1907 } else {
1908 contract.subscriptions.push(subscription);
1909 }
1910 }
1911 AlterOperation::DisableEvents => {
1912 for subscription in &mut contract.subscriptions {
1913 subscription.enabled = false;
1914 }
1915 }
1916 AlterOperation::AddSubscription { name, descriptor } => {
1917 let mut sub = descriptor.clone();
1918 sub.name = name.clone();
1919 sub.source = contract.name.clone();
1920 sub.enabled = true;
1921 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1922 {
1923 *existing = sub;
1924 } else {
1925 contract.subscriptions.push(sub);
1926 }
1927 }
1928 AlterOperation::DropSubscription { name } => {
1929 contract.subscriptions.retain(|s| s.name != *name);
1930 }
1931 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1936 AlterOperation::SetRetention { duration_ms } => {
1937 contract.retention_duration_ms = Some(*duration_ms);
1938 }
1939 AlterOperation::UnsetRetention => {
1940 contract.retention_duration_ms = None;
1941 }
1942 }
1943 }
1944}
1945
1946pub(crate) fn retention_timestamp_column_exists(
1951 contract: &crate::physical::CollectionContract,
1952) -> bool {
1953 if contract.timestamps_enabled {
1954 return true;
1955 }
1956 if matches!(
1957 contract.declared_model,
1958 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
1959 ) {
1960 return true;
1964 }
1965 contract
1966 .declared_columns
1967 .iter()
1968 .any(|column| is_temporal_data_type(&column.data_type))
1969}
1970
1971fn is_temporal_data_type(data_type: &str) -> bool {
1972 let upper = data_type.to_ascii_uppercase();
1973 matches!(
1974 upper.as_str(),
1975 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
1976 )
1977}
1978
1979fn validate_event_subscriptions(
1980 runtime: &RedDBRuntime,
1981 source: &str,
1982 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1983) -> RedDBResult<()> {
1984 for subscription in subscriptions
1985 .iter()
1986 .filter(|subscription| subscription.enabled)
1987 {
1988 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1989 return Err(RedDBError::Query(
1990 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1991 ));
1992 }
1993 validate_subscription_auth(runtime, source, subscription)?;
1994 if subscription.target_queue == source
1995 || subscription_would_create_cycle(
1996 &runtime.inner.db,
1997 source,
1998 &subscription.target_queue,
1999 )
2000 {
2001 return Err(RedDBError::Query(
2002 "subscription would create cycle".to_string(),
2003 ));
2004 }
2005 audit_subscription_redact_gap(runtime, source, subscription);
2006 }
2007 Ok(())
2008}
2009
2010fn validate_subscription_auth(
2011 runtime: &RedDBRuntime,
2012 source: &str,
2013 subscription: &crate::catalog::SubscriptionDescriptor,
2014) -> RedDBResult<()> {
2015 let auth_store = match runtime.inner.auth_store.read().clone() {
2016 Some(store) => store,
2017 None => return Ok(()),
2018 };
2019 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2020 Some(identity) => identity,
2021 None => return Ok(()),
2022 };
2023 let tenant = crate::runtime::impl_core::current_tenant();
2024 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2025
2026 if auth_store.iam_authorization_enabled() {
2027 let ctx = crate::auth::policies::EvalContext {
2028 principal_tenant: tenant.clone(),
2029 current_tenant: tenant.clone(),
2030 peer_ip: None,
2031 mfa_present: false,
2032 now_ms: crate::auth::now_ms(),
2033 principal_is_admin_role: role == crate::auth::Role::Admin,
2034 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2035 principal_is_platform_scoped: principal.tenant.is_none(),
2036 };
2037 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2038 if let Some(t) = tenant.as_deref() {
2039 source_resource = source_resource.with_tenant(t.to_string());
2040 }
2041 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
2042 return Err(RedDBError::Query(format!(
2043 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2044 principal, source_resource.kind, source_resource.name
2045 )));
2046 }
2047
2048 let mut target_resource =
2049 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2050 if let Some(t) = tenant.as_deref() {
2051 target_resource = target_resource.with_tenant(t.to_string());
2052 }
2053 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
2054 return Err(RedDBError::Query(format!(
2055 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2056 principal, target_resource.kind, target_resource.name
2057 )));
2058 }
2059 return Ok(());
2060 }
2061
2062 let ctx = crate::auth::privileges::AuthzContext {
2063 principal: &username,
2064 effective_role: role,
2065 tenant: tenant.as_deref(),
2066 };
2067 auth_store
2068 .check_grant(
2069 &ctx,
2070 crate::auth::privileges::Action::Select,
2071 &crate::auth::privileges::Resource::table_from_name(source),
2072 )
2073 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2074 auth_store
2075 .check_grant(
2076 &ctx,
2077 crate::auth::privileges::Action::Insert,
2078 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2079 )
2080 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2081 Ok(())
2082}
2083
2084fn audit_subscription_redact_gap(
2085 runtime: &RedDBRuntime,
2086 source: &str,
2087 subscription: &crate::catalog::SubscriptionDescriptor,
2088) {
2089 let auth_store = match runtime.inner.auth_store.read().clone() {
2090 Some(store) if store.iam_authorization_enabled() => store,
2091 _ => return,
2092 };
2093 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2094 Some(identity) => identity,
2095 None => return,
2096 };
2097 let tenant = crate::runtime::impl_core::current_tenant();
2098 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2099 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2100 if missing.is_empty() {
2101 return;
2102 }
2103
2104 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2105 tracing::warn!(
2106 target: "reddb::operator",
2107 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2108 source,
2109 subscription.target_queue,
2110 columns
2111 );
2112 let mut event = AuditEvent::builder("subscription_redact_gap")
2113 .principal(username)
2114 .source(AuditAuthSource::System)
2115 .resource(format!(
2116 "subscription:{}->{}",
2117 source, subscription.target_queue
2118 ))
2119 .outcome(Outcome::Success)
2120 .field(AuditFieldEscaper::field("source", source))
2121 .field(AuditFieldEscaper::field(
2122 "target_queue",
2123 subscription.target_queue.clone(),
2124 ))
2125 .field(AuditFieldEscaper::field(
2126 "subscription",
2127 subscription.name.clone(),
2128 ))
2129 .field(AuditFieldEscaper::field("columns", columns))
2130 .field(AuditFieldEscaper::field("role", role.as_str()));
2131 if let Some(t) = tenant {
2132 event = event.tenant(t);
2133 }
2134 runtime.inner.audit_log.record_event(event.build());
2135}
2136
2137fn subscription_redact_gap_columns(
2138 auth_store: &crate::auth::store::AuthStore,
2139 principal: &crate::auth::UserId,
2140 source: &str,
2141 subscription: &crate::catalog::SubscriptionDescriptor,
2142) -> BTreeSet<String> {
2143 let redacted: HashSet<String> = subscription
2144 .redact_fields
2145 .iter()
2146 .map(|field| field.to_ascii_lowercase())
2147 .collect();
2148 auth_store
2149 .effective_policies(principal)
2150 .iter()
2151 .flat_map(|policy| policy.statements.iter())
2152 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2153 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2154 .flat_map(|statement| statement.resources.iter())
2155 .filter_map(|resource| denied_column_for_source(resource, source))
2156 .filter(|column| !redact_covers_column(&redacted, source, column))
2157 .collect()
2158}
2159
2160fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2161 match pattern {
2162 crate::auth::policies::ActionPattern::Wildcard => true,
2163 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2164 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2165 "select".len() > prefix.len() + 1
2166 && "select".starts_with(prefix)
2167 && "select".as_bytes()[prefix.len()] == b':'
2168 }
2169 }
2170}
2171
2172fn denied_column_for_source(
2173 resource: &crate::auth::policies::ResourcePattern,
2174 source: &str,
2175) -> Option<String> {
2176 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2177 return None;
2178 };
2179 if kind != "column" {
2180 return None;
2181 }
2182 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2183 (column.table_resource_name() == source).then_some(column.column)
2184}
2185
2186fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2187 let column = column.to_ascii_lowercase();
2188 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2189 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2190}
2191
2192fn subscription_would_create_cycle(
2193 db: &crate::storage::unified::devx::RedDB,
2194 source: &str,
2195 target: &str,
2196) -> bool {
2197 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2198 for contract in db.collection_contracts() {
2199 for subscription in contract
2200 .subscriptions
2201 .into_iter()
2202 .filter(|subscription| subscription.enabled)
2203 {
2204 graph
2205 .entry(subscription.source)
2206 .or_default()
2207 .push(subscription.target_queue);
2208 }
2209 }
2210 graph
2211 .entry(source.to_string())
2212 .or_default()
2213 .push(target.to_string());
2214
2215 let mut stack = vec![target.to_string()];
2216 let mut seen = HashSet::new();
2217 while let Some(node) = stack.pop() {
2218 if node == source {
2219 return true;
2220 }
2221 if !seen.insert(node.clone()) {
2222 continue;
2223 }
2224 if let Some(next) = graph.get(&node) {
2225 stack.extend(next.iter().cloned());
2226 }
2227 }
2228 false
2229}
2230
2231pub(crate) fn ensure_event_target_queue_pub(
2232 runtime: &RedDBRuntime,
2233 queue: &str,
2234) -> RedDBResult<()> {
2235 ensure_event_target_queue(runtime, queue)
2236}
2237
2238fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2239 let store = runtime.inner.db.store();
2240 if store.get_collection(queue).is_some() {
2241 return Ok(());
2242 }
2243 store
2244 .create_collection(queue)
2245 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2246 runtime
2247 .inner
2248 .db
2249 .save_collection_contract(event_queue_collection_contract(queue))
2250 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2251 store.set_config_tree(
2252 &format!("queue.{queue}.mode"),
2253 &crate::serde_json::Value::String("fanout".to_string()),
2254 );
2255 Ok(())
2256}
2257
2258fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2259 let now = current_unix_ms();
2260 crate::physical::CollectionContract {
2261 name: queue.to_string(),
2262 declared_model: crate::catalog::CollectionModel::Queue,
2263 schema_mode: crate::catalog::SchemaMode::Dynamic,
2264 origin: crate::physical::ContractOrigin::Implicit,
2265 version: 1,
2266 created_at_unix_ms: now,
2267 updated_at_unix_ms: now,
2268 default_ttl_ms: None,
2269 vector_dimension: None,
2270 vector_metric: None,
2271 context_index_fields: Vec::new(),
2272 declared_columns: Vec::new(),
2273 table_def: None,
2274 timestamps_enabled: false,
2275 context_index_enabled: false,
2276 metrics_raw_retention_ms: None,
2277 metrics_rollup_policies: Vec::new(),
2278 metrics_tenant_identity: None,
2279 metrics_namespace: None,
2280 append_only: true,
2281 subscriptions: Vec::new(),
2282 session_key: None,
2283 session_gap_ms: None,
2284 retention_duration_ms: None,
2285 }
2286}
2287
2288fn build_table_def_from_create_table(
2289 query: &CreateTableQuery,
2290) -> RedDBResult<crate::storage::schema::TableDef> {
2291 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2292 for column in &query.columns {
2293 if column.primary_key {
2294 table.primary_key.push(column.name.clone());
2295 table.constraints.push(
2296 crate::storage::schema::Constraint::new(
2297 format!("pk_{}", column.name),
2298 crate::storage::schema::ConstraintType::PrimaryKey,
2299 )
2300 .on_columns(vec![column.name.clone()]),
2301 );
2302 }
2303 if column.unique {
2304 table.constraints.push(
2305 crate::storage::schema::Constraint::new(
2306 format!("uniq_{}", column.name),
2307 crate::storage::schema::ConstraintType::Unique,
2308 )
2309 .on_columns(vec![column.name.clone()]),
2310 );
2311 }
2312 if column.not_null {
2313 table.constraints.push(
2314 crate::storage::schema::Constraint::new(
2315 format!("not_null_{}", column.name),
2316 crate::storage::schema::ConstraintType::NotNull,
2317 )
2318 .on_columns(vec![column.name.clone()]),
2319 );
2320 }
2321 table.columns.push(column_def_from_ddl(column)?);
2322 }
2323 if query.timestamps {
2328 table.columns.push(
2329 crate::storage::schema::ColumnDef::new(
2330 "created_at".to_string(),
2331 crate::storage::schema::DataType::UnsignedInteger,
2332 )
2333 .not_null(),
2334 );
2335 table.columns.push(
2336 crate::storage::schema::ColumnDef::new(
2337 "updated_at".to_string(),
2338 crate::storage::schema::DataType::UnsignedInteger,
2339 )
2340 .not_null(),
2341 );
2342 table.constraints.push(
2343 crate::storage::schema::Constraint::new(
2344 "not_null_created_at".to_string(),
2345 crate::storage::schema::ConstraintType::NotNull,
2346 )
2347 .on_columns(vec!["created_at".to_string()]),
2348 );
2349 table.constraints.push(
2350 crate::storage::schema::Constraint::new(
2351 "not_null_updated_at".to_string(),
2352 crate::storage::schema::ConstraintType::NotNull,
2353 )
2354 .on_columns(vec!["updated_at".to_string()]),
2355 );
2356 }
2357 table
2358 .validate()
2359 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2360 Ok(table)
2361}
2362
2363fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2364 let data_type = resolve_declared_data_type(&column.data_type)
2365 .map_err(|err| RedDBError::Query(err.to_string()))?;
2366 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2367 if column.not_null {
2368 column_def = column_def.not_null();
2369 }
2370 if let Some(default) = &column.default {
2371 column_def = column_def.with_default(default.as_bytes().to_vec());
2372 }
2373 if column.compress.unwrap_or(0) > 0 {
2374 column_def = column_def.compressed();
2375 }
2376 if !column.enum_variants.is_empty() {
2377 column_def = column_def.with_variants(column.enum_variants.clone());
2378 }
2379 if let Some(precision) = column.decimal_precision {
2380 column_def = column_def.with_precision(precision);
2381 }
2382 if let Some(element_type) = &column.array_element {
2383 column_def = column_def.with_element_type(
2384 resolve_declared_data_type(element_type)
2385 .map_err(|err| RedDBError::Query(err.to_string()))?,
2386 );
2387 }
2388 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2389 if column.unique {
2390 column_def = column_def.with_metadata("unique", "true");
2391 }
2392 if column.primary_key {
2393 column_def = column_def.with_metadata("primary_key", "true");
2394 }
2395 Ok(column_def)
2396}
2397
2398fn current_unix_ms() -> u128 {
2399 std::time::SystemTime::now()
2400 .duration_since(std::time::UNIX_EPOCH)
2401 .unwrap_or_default()
2402 .as_millis()
2403}
2404
2405#[cfg(test)]
2406mod tests {
2407 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2408 use crate::auth::store::{AuthStore, PrincipalRef};
2409 use crate::auth::UserId;
2410 use crate::auth::{AuthConfig, Role};
2411 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2412 use crate::storage::schema::Value;
2413 use crate::{RedDBOptions, RedDBRuntime};
2414 use std::sync::Arc;
2415
2416 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2417 Policy {
2418 id: id.to_string(),
2419 version: 1,
2420 tenant: None,
2421 created_at: 0,
2422 updated_at: 0,
2423 statements: vec![Statement {
2424 sid: None,
2425 effect: Effect::Allow,
2426 actions: vec![ActionPattern::Exact(action.to_string())],
2427 resources: vec![ResourcePattern::Exact {
2428 kind: "collection".to_string(),
2429 name: collection.to_string(),
2430 }],
2431 condition: None,
2432 }],
2433 }
2434 }
2435
2436 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2437 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2438 *rt.inner.auth_store.write() = Some(store.clone());
2439 store
2440 }
2441
2442 #[test]
2443 fn drop_denied_without_iam_policy() {
2444 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2445 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2446 let store = wire_auth_store(&rt);
2447 let select_only = Policy {
2449 id: "select-only".to_string(),
2450 version: 1,
2451 tenant: None,
2452 created_at: 0,
2453 updated_at: 0,
2454 statements: vec![Statement {
2455 sid: None,
2456 effect: Effect::Allow,
2457 actions: vec![ActionPattern::Exact("select".to_string())],
2458 resources: vec![ResourcePattern::Wildcard],
2459 condition: None,
2460 }],
2461 };
2462 store.put_policy_internal(select_only).unwrap();
2463 let alice = UserId::from_parts(None, "alice");
2464 store
2465 .attach_policy(PrincipalRef::User(alice), "select-only")
2466 .unwrap();
2467 set_current_auth_identity("alice".to_string(), Role::Write);
2468 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2469 clear_current_auth_identity();
2470 assert!(
2471 format!("{err}").contains("denied by IAM policy"),
2472 "got: {err}"
2473 );
2474 }
2475
2476 #[test]
2477 fn drop_allowed_with_explicit_iam_policy() {
2478 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2479 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2480 let store = wire_auth_store(&rt);
2481 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2482 store.put_policy_internal(policy).unwrap();
2483 let bob = UserId::from_parts(None, "bob");
2484 store
2485 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2486 .unwrap();
2487 set_current_auth_identity("bob".to_string(), Role::Write);
2488 rt.execute_query("DROP TABLE bar").unwrap();
2489 clear_current_auth_identity();
2490 }
2491
2492 #[test]
2493 fn drop_allowed_with_wildcard_iam_policy() {
2494 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2495 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2496 let store = wire_auth_store(&rt);
2497 let policy = Policy {
2498 id: "allow-drop-all".to_string(),
2499 version: 1,
2500 tenant: None,
2501 created_at: 0,
2502 updated_at: 0,
2503 statements: vec![Statement {
2504 sid: None,
2505 effect: Effect::Allow,
2506 actions: vec![ActionPattern::Exact("drop".to_string())],
2507 resources: vec![ResourcePattern::Wildcard],
2508 condition: None,
2509 }],
2510 };
2511 store.put_policy_internal(policy).unwrap();
2512 let carl = UserId::from_parts(None, "carl");
2513 store
2514 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2515 .unwrap();
2516 set_current_auth_identity("carl".to_string(), Role::Write);
2517 rt.execute_query("DROP TABLE baz").unwrap();
2518 clear_current_auth_identity();
2519 }
2520
2521 #[test]
2522 fn truncate_denied_without_iam_policy() {
2523 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2524 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2525 let store = wire_auth_store(&rt);
2526 let select_only = Policy {
2528 id: "select-only-2".to_string(),
2529 version: 1,
2530 tenant: None,
2531 created_at: 0,
2532 updated_at: 0,
2533 statements: vec![Statement {
2534 sid: None,
2535 effect: Effect::Allow,
2536 actions: vec![ActionPattern::Exact("select".to_string())],
2537 resources: vec![ResourcePattern::Wildcard],
2538 condition: None,
2539 }],
2540 };
2541 store.put_policy_internal(select_only).unwrap();
2542 let dana = UserId::from_parts(None, "dana");
2543 store
2544 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2545 .unwrap();
2546 set_current_auth_identity("dana".to_string(), Role::Write);
2547 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2548 clear_current_auth_identity();
2549 assert!(
2550 format!("{err}").contains("denied by IAM policy"),
2551 "got: {err}"
2552 );
2553 }
2554
2555 #[test]
2556 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2557 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2558 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2559 .unwrap();
2560 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2561 .unwrap();
2562 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2563 .unwrap();
2564
2565 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2566 assert_eq!(truncated.statement_type, "truncate");
2567 assert_eq!(truncated.affected_rows, 0);
2568
2569 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2570 assert!(empty.result.records.is_empty());
2571
2572 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2573 .unwrap();
2574 let selected = rt
2575 .execute_query("SELECT name FROM users WHERE id = 3")
2576 .unwrap();
2577 let name = selected.result.records[0].get("name").unwrap();
2578 assert_eq!(name, &Value::text("cy"));
2579 assert!(rt.db().collection_contract("users").is_some());
2580 assert!(rt
2581 .inner
2582 .index_store
2583 .list_indices("users")
2584 .iter()
2585 .any(|index| index.name == "idx_users_id"));
2586 }
2587
2588 #[test]
2589 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2590 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2591 rt.execute_query("CREATE QUEUE tasks").unwrap();
2592 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2593
2594 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2595 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2596
2597 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2598 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2599 assert_eq!(
2600 len.result.records[0].get("len"),
2601 Some(&Value::UnsignedInteger(0))
2602 );
2603 }
2604
2605 #[test]
2606 fn truncate_system_schema_is_read_only() {
2607 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2608 let err = rt
2609 .execute_query("TRUNCATE COLLECTION red.collections")
2610 .unwrap_err();
2611 assert!(format!("{err}").contains("system schema is read-only"));
2612 }
2613
2614 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2617 let result = rt
2618 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2619 .expect("peek queue");
2620 result
2621 .result
2622 .records
2623 .iter()
2624 .map(
2625 |record| match record.get("payload").expect("payload column") {
2626 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2627 other => panic!("expected JSON queue payload, got {other:?}"),
2628 },
2629 )
2630 .collect()
2631 }
2632
2633 #[test]
2636 fn truncate_event_enabled_table_emits_single_truncate_event() {
2637 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2638 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2639 .unwrap();
2640 rt.execute_query(
2641 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2642 )
2643 .unwrap();
2644
2645 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2647
2648 rt.execute_query("TRUNCATE TABLE users").unwrap();
2649
2650 let events = queue_payloads(&rt, "users_events");
2651 assert_eq!(
2653 events.len(),
2654 1,
2655 "expected 1 truncate event, got {}",
2656 events.len()
2657 );
2658 let ev = events[0].as_object().expect("event is object");
2659 assert_eq!(
2660 ev.get("op").and_then(crate::json::Value::as_str),
2661 Some("truncate")
2662 );
2663 assert_eq!(
2664 ev.get("collection").and_then(crate::json::Value::as_str),
2665 Some("users")
2666 );
2667 assert_eq!(
2668 ev.get("entities_count")
2669 .and_then(crate::json::Value::as_u64),
2670 Some(3)
2671 );
2672 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2673 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2674 assert!(ev
2675 .get("event_id")
2676 .and_then(crate::json::Value::as_str)
2677 .is_some_and(|s| !s.is_empty()));
2678 }
2679
2680 #[test]
2682 fn truncate_no_events_collection_emits_nothing() {
2683 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2684 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2685 .unwrap();
2686 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2687 .unwrap();
2688 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2690 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2692 assert!(rows.result.records.is_empty());
2693 }
2694
2695 #[test]
2699 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2700 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2701 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2702 .unwrap();
2703 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2704 .unwrap();
2705
2706 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2708
2709 rt.execute_query("DROP TABLE users").unwrap();
2710
2711 let events = queue_payloads(&rt, "users_events");
2713 assert_eq!(
2714 events.len(),
2715 1,
2716 "expected 1 collection_dropped event, got {}",
2717 events.len()
2718 );
2719 let ev = events[0].as_object().expect("event is object");
2720 assert_eq!(
2721 ev.get("op").and_then(crate::json::Value::as_str),
2722 Some("collection_dropped")
2723 );
2724 assert_eq!(
2725 ev.get("collection").and_then(crate::json::Value::as_str),
2726 Some("users")
2727 );
2728 assert_eq!(
2729 ev.get("final_entities_count")
2730 .and_then(crate::json::Value::as_u64),
2731 Some(2)
2732 );
2733 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2734 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2735 assert!(ev
2736 .get("event_id")
2737 .and_then(crate::json::Value::as_str)
2738 .is_some_and(|s| !s.is_empty()));
2739
2740 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2742 assert!(
2743 format!("{err}").contains("users"),
2744 "expected not-found error"
2745 );
2746 }
2747
2748 #[test]
2751 fn drop_no_events_collection_emits_nothing() {
2752 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2753 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2754 .unwrap();
2755 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2756 .unwrap();
2757 rt.execute_query("DROP TABLE plain").unwrap();
2758 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2760 assert!(format!("{err}").contains("plain"));
2761 }
2762
2763 #[test]
2767 fn ops_filter_insert_only_ignores_update_and_delete() {
2768 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2769 rt.execute_query(
2770 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2771 )
2772 .unwrap();
2773 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2774 .unwrap();
2775 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2776 .unwrap();
2777 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2778
2779 let events = queue_payloads(&rt, "items_events");
2780 assert_eq!(
2782 events.len(),
2783 1,
2784 "expected 1 insert event, got {}",
2785 events.len()
2786 );
2787 assert_eq!(
2788 events[0]
2789 .as_object()
2790 .unwrap()
2791 .get("op")
2792 .and_then(crate::json::Value::as_str),
2793 Some("insert")
2794 );
2795 }
2796
2797 #[test]
2799 fn where_filter_skips_rows_that_do_not_match() {
2800 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2801 rt.execute_query(
2802 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2803 )
2804 .unwrap();
2805
2806 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2808 .unwrap();
2809 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2811 .unwrap();
2812
2813 let events = queue_payloads(&rt, "users_events");
2814 assert_eq!(
2815 events.len(),
2816 1,
2817 "expected 1 event (only active), got {}",
2818 events.len()
2819 );
2820 let ev = events[0].as_object().unwrap();
2821 assert_eq!(
2822 ev.get("op").and_then(crate::json::Value::as_str),
2823 Some("insert")
2824 );
2825 let after = ev.get("after").unwrap().as_object().unwrap();
2826 assert_eq!(
2827 after.get("status").and_then(crate::json::Value::as_str),
2828 Some("active")
2829 );
2830 }
2831
2832 #[test]
2834 fn ops_filter_and_where_filter_combined() {
2835 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2836 rt.execute_query(
2837 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2838 )
2839 .unwrap();
2840
2841 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2843 .unwrap();
2844 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2846 .unwrap();
2847 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2849 .unwrap();
2850 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2852
2853 let events = queue_payloads(&rt, "items_events");
2854 assert_eq!(
2856 events.len(),
2857 1,
2858 "expected 1 event, got {}: {events:?}",
2859 events.len()
2860 );
2861 assert_eq!(
2862 events[0]
2863 .as_object()
2864 .unwrap()
2865 .get("op")
2866 .and_then(crate::json::Value::as_str),
2867 Some("insert")
2868 );
2869 }
2870
2871 #[test]
2873 fn where_filter_on_delete_checks_before_state() {
2874 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2875 rt.execute_query(
2876 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2877 )
2878 .unwrap();
2879
2880 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2881 .unwrap();
2882
2883 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2885 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2887
2888 let events = queue_payloads(&rt, "users_events");
2889 assert_eq!(
2890 events.len(),
2891 1,
2892 "expected 1 delete event, got {}",
2893 events.len()
2894 );
2895 let ev = events[0].as_object().unwrap();
2896 assert_eq!(
2897 ev.get("op").and_then(crate::json::Value::as_str),
2898 Some("delete")
2899 );
2900 }
2901
2902 #[test]
2906 fn alter_add_column_on_event_enabled_table_succeeds() {
2907 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2908 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2909 .unwrap();
2910 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2912 .unwrap();
2913 let contract = rt.db().collection_contract("users").unwrap();
2915 assert!(
2916 contract.declared_columns.iter().any(|c| c.name == "phone"),
2917 "phone column should be in contract"
2918 );
2919 assert!(
2921 contract.subscriptions.iter().any(|s| s.enabled),
2922 "subscription should remain enabled"
2923 );
2924 }
2925
2926 #[test]
2929 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2930 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2931 rt.execute_query(
2932 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2933 )
2934 .unwrap();
2935 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2937 .unwrap();
2938 let contract = rt.db().collection_contract("items").unwrap();
2939 assert!(
2940 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2941 "secret column should be removed"
2942 );
2943 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2945 .unwrap();
2946 assert!(
2948 contract.subscriptions.iter().any(|s| s.enabled),
2949 "subscription should remain enabled"
2950 );
2951 }
2952}