1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(&query.name, query.collection_model)
265 };
266 self.inner
267 .db
268 .save_collection_contract(contract)
269 .map_err(|err| RedDBError::Internal(err.to_string()))?;
270 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271 store.set_config_tree(
272 &format!("red.collection_tenants.{}", query.name),
273 &crate::serde_json::Value::String(tenant_id),
274 );
275 }
276 self.inner
277 .db
278 .persist_metadata()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281
282 Ok(RuntimeQueryResult::ok_message(
283 raw_query.to_string(),
284 &format!("{label} '{}' created", query.name),
285 "create",
286 ))
287 }
288
289 pub fn execute_create_collection(
290 &self,
291 raw_query: &str,
292 query: &CreateCollectionQuery,
293 ) -> RedDBResult<RuntimeQueryResult> {
294 let model = match query.kind.as_str() {
295 "graph" => CollectionModel::Graph,
296 "document" => CollectionModel::Document,
297 "metrics" => CollectionModel::Metrics,
298 "blockchain" => CollectionModel::Table,
303 other => {
304 return Err(RedDBError::Query(format!(
305 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
306 )));
307 }
308 };
309 let create = CreateTableQuery {
310 collection_model: model,
311 name: query.name.clone(),
312 columns: Vec::new(),
313 if_not_exists: query.if_not_exists,
314 default_ttl_ms: None,
315 metrics_rollup_policies: Vec::new(),
316 context_index_fields: Vec::new(),
317 context_index_enabled: false,
318 timestamps: false,
319 partition_by: None,
320 tenant_by: None,
321 append_only: false,
322 subscriptions: Vec::new(),
323 vault_own_master_key: false,
324 };
325 let result = self.execute_create_table(raw_query, &create)?;
326 if query.kind == "blockchain" {
327 self.install_blockchain_kind(&query.name)?;
328 }
329 if !query.allowed_signers.is_empty() {
334 let actor = crate::runtime::impl_core::current_user_projected()
335 .unwrap_or_else(|| "@system/create-collection".to_string());
336 crate::runtime::signed_writes_kind::install(
337 &self.inner.db.store(),
338 &query.name,
339 &query.allowed_signers,
340 &actor,
341 );
342 }
343 Ok(result)
344 }
345
346 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
350 use crate::runtime::blockchain_kind;
351 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
352 use std::sync::Arc;
353
354 let store = self.inner.db.store();
355 blockchain_kind::mark_as_chain(&store, name);
356
357 let existing_tip = blockchain_kind::chain_tip(&store, name);
358 if existing_tip.height.is_some() {
359 return Ok(());
360 }
361
362 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
363 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
364 fields.into_iter().collect();
365 let entity = UnifiedEntity::new(
366 EntityId::new(0),
367 EntityKind::TableRow {
368 table: Arc::from(name),
369 row_id: 0,
370 },
371 EntityData::Row(RowData {
372 columns: Vec::new(),
373 named: Some(named),
374 schema: None,
375 }),
376 );
377 store
378 .insert_auto(name, entity)
379 .map_err(|err| RedDBError::Internal(err.to_string()))?;
380 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
383 self.inner
384 .chain_tip_cache
385 .lock()
386 .insert(name.to_string(), tip);
387 }
388 Ok(())
389 }
390
391 pub fn execute_create_vector(
392 &self,
393 raw_query: &str,
394 query: &CreateVectorQuery,
395 ) -> RedDBResult<RuntimeQueryResult> {
396 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
397 if is_system_schema_name(&query.name) {
398 return Err(RedDBError::Query("system schema is read-only".to_string()));
399 }
400 let store = self.inner.db.store();
401 if store.get_collection(&query.name).is_some() {
402 if query.if_not_exists {
403 return Ok(RuntimeQueryResult::ok_message(
404 raw_query.to_string(),
405 &format!("vector '{}' already exists", query.name),
406 "create",
407 ));
408 }
409 return Err(RedDBError::Query(format!(
410 "vector '{}' already exists",
411 query.name
412 )));
413 }
414
415 store
416 .create_collection(&query.name)
417 .map_err(|err| RedDBError::Internal(err.to_string()))?;
418 self.inner
419 .db
420 .save_collection_contract(vector_collection_contract(query))
421 .map_err(|err| RedDBError::Internal(err.to_string()))?;
422 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
423 store.set_config_tree(
424 &format!("red.collection_tenants.{}", query.name),
425 &crate::serde_json::Value::String(tenant_id),
426 );
427 }
428 self.inner
429 .db
430 .persist_metadata()
431 .map_err(|err| RedDBError::Internal(err.to_string()))?;
432 self.invalidate_result_cache();
433
434 Ok(RuntimeQueryResult::ok_message(
435 raw_query.to_string(),
436 &format!("vector '{}' created", query.name),
437 "create",
438 ))
439 }
440
441 fn provision_vault_key_material(
442 &self,
443 collection: &str,
444 own_master_key: bool,
445 ) -> RedDBResult<()> {
446 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
447 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
448 })?;
449 if !auth_store.is_vault_backed() {
450 return Err(RedDBError::Query(
451 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
452 ));
453 }
454
455 if auth_store.vault_secret_key().is_none() {
456 let key = crate::auth::store::random_bytes(32);
457 auth_store
458 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
459 .map_err(|err| RedDBError::Query(err.to_string()))?;
460 }
461
462 if own_master_key {
463 let key = crate::auth::store::random_bytes(32);
464 auth_store
465 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
466 .map_err(|err| RedDBError::Query(err.to_string()))?;
467 }
468
469 Ok(())
470 }
471
472 pub fn execute_drop_table(
476 &self,
477 raw_query: &str,
478 query: &DropTableQuery,
479 ) -> RedDBResult<RuntimeQueryResult> {
480 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
481 let store = self.inner.db.store();
482
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486
487 let exists = store.get_collection(&query.name).is_some();
488 if !exists {
489 if query.if_exists {
490 return Ok(RuntimeQueryResult::ok_message(
491 raw_query.to_string(),
492 &format!("table '{}' does not exist", query.name),
493 "drop",
494 ));
495 }
496 return Err(RedDBError::NotFound(format!(
497 "table '{}' not found",
498 query.name
499 )));
500 }
501 let actual =
502 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
503 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
504
505 let final_count = store
508 .get_collection(&query.name)
509 .map(|manager| manager.query_all(|_| true).len() as u64)
510 .unwrap_or(0);
511 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
512 self,
513 &query.name,
514 final_count,
515 )?;
516
517 let orphaned_indices: Vec<String> = self
518 .inner
519 .index_store
520 .list_indices(&query.name)
521 .into_iter()
522 .map(|index| index.name)
523 .collect();
524 for name in &orphaned_indices {
525 self.inner.index_store.drop_index(name, &query.name);
526 }
527
528 store
529 .drop_collection(&query.name)
530 .map_err(|err| RedDBError::Internal(err.to_string()))?;
531 self.inner.db.invalidate_vector_index(&query.name);
532 self.inner.db.clear_collection_default_ttl_ms(&query.name);
533 self.inner
534 .db
535 .remove_collection_contract(&query.name)
536 .map_err(|err| RedDBError::Internal(err.to_string()))?;
537 self.clear_table_planner_stats(&query.name);
538 self.invalidate_result_cache();
539 if let Some(store) = self.inner.auth_store.read().clone() {
543 store.invalidate_visible_collections_cache();
544 }
545 self.inner
546 .db
547 .persist_metadata()
548 .map_err(|err| RedDBError::Internal(err.to_string()))?;
549 self.schema_vocabulary_apply(
555 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
556 collection: query.name.clone(),
557 },
558 );
559
560 Ok(RuntimeQueryResult::ok_message(
561 raw_query.to_string(),
562 &format!("table '{}' dropped", query.name),
563 "drop",
564 ))
565 }
566
567 pub fn execute_drop_graph(
568 &self,
569 raw_query: &str,
570 query: &DropGraphQuery,
571 ) -> RedDBResult<RuntimeQueryResult> {
572 self.execute_drop_typed_collection(
573 raw_query,
574 &query.name,
575 query.if_exists,
576 CollectionModel::Graph,
577 "graph",
578 )
579 }
580
581 pub fn execute_drop_vector(
582 &self,
583 raw_query: &str,
584 query: &DropVectorQuery,
585 ) -> RedDBResult<RuntimeQueryResult> {
586 self.execute_drop_typed_collection(
587 raw_query,
588 &query.name,
589 query.if_exists,
590 CollectionModel::Vector,
591 "vector",
592 )
593 }
594
595 pub fn execute_drop_document(
596 &self,
597 raw_query: &str,
598 query: &DropDocumentQuery,
599 ) -> RedDBResult<RuntimeQueryResult> {
600 self.execute_drop_typed_collection(
601 raw_query,
602 &query.name,
603 query.if_exists,
604 CollectionModel::Document,
605 "document",
606 )
607 }
608
609 pub fn execute_drop_kv(
610 &self,
611 raw_query: &str,
612 query: &DropKvQuery,
613 ) -> RedDBResult<RuntimeQueryResult> {
614 let label = polymorphic_resolver::model_name(query.model);
615 self.execute_drop_typed_collection(
616 raw_query,
617 &query.name,
618 query.if_exists,
619 query.model,
620 label,
621 )
622 }
623
624 pub fn execute_drop_collection(
625 &self,
626 raw_query: &str,
627 query: &DropCollectionQuery,
628 ) -> RedDBResult<RuntimeQueryResult> {
629 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
630 if is_system_schema_name(&query.name) {
631 return Err(RedDBError::Query("system schema is read-only".to_string()));
632 }
633 let store = self.inner.db.store();
634 if store.get_collection(&query.name).is_none() {
635 if query.if_exists {
636 return Ok(RuntimeQueryResult::ok_message(
637 raw_query.to_string(),
638 &format!("collection '{}' does not exist", query.name),
639 "drop",
640 ));
641 }
642 return Err(RedDBError::NotFound(format!(
643 "collection '{}' not found",
644 query.name
645 )));
646 }
647
648 let actual =
649 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
650 if let Some(expected) = query.model {
651 polymorphic_resolver::ensure_model_match(expected, actual)?;
652 }
653
654 match actual {
655 CollectionModel::Table => self.execute_drop_table(
656 raw_query,
657 &DropTableQuery {
658 name: query.name.clone(),
659 if_exists: query.if_exists,
660 },
661 ),
662 CollectionModel::TimeSeries => self.execute_drop_timeseries(
663 raw_query,
664 &DropTimeSeriesQuery {
665 name: query.name.clone(),
666 if_exists: query.if_exists,
667 },
668 ),
669 CollectionModel::Queue => self.execute_drop_queue(
670 raw_query,
671 &DropQueueQuery {
672 name: query.name.clone(),
673 if_exists: query.if_exists,
674 },
675 ),
676 CollectionModel::Graph => self.execute_drop_graph(
677 raw_query,
678 &DropGraphQuery {
679 name: query.name.clone(),
680 if_exists: query.if_exists,
681 },
682 ),
683 CollectionModel::Vector => self.execute_drop_vector(
684 raw_query,
685 &DropVectorQuery {
686 name: query.name.clone(),
687 if_exists: query.if_exists,
688 },
689 ),
690 CollectionModel::Document => self.execute_drop_document(
691 raw_query,
692 &DropDocumentQuery {
693 name: query.name.clone(),
694 if_exists: query.if_exists,
695 },
696 ),
697 CollectionModel::Kv => self.execute_drop_kv(
698 raw_query,
699 &DropKvQuery {
700 name: query.name.clone(),
701 if_exists: query.if_exists,
702 model: CollectionModel::Kv,
703 },
704 ),
705 CollectionModel::Config => self.execute_drop_kv(
706 raw_query,
707 &DropKvQuery {
708 name: query.name.clone(),
709 if_exists: query.if_exists,
710 model: CollectionModel::Config,
711 },
712 ),
713 CollectionModel::Vault => self.execute_drop_kv(
714 raw_query,
715 &DropKvQuery {
716 name: query.name.clone(),
717 if_exists: query.if_exists,
718 model: CollectionModel::Vault,
719 },
720 ),
721 CollectionModel::Hll => self.execute_probabilistic_command(
722 raw_query,
723 &ProbabilisticCommand::DropHll {
724 name: query.name.clone(),
725 if_exists: query.if_exists,
726 },
727 ),
728 CollectionModel::Sketch => self.execute_probabilistic_command(
729 raw_query,
730 &ProbabilisticCommand::DropSketch {
731 name: query.name.clone(),
732 if_exists: query.if_exists,
733 },
734 ),
735 CollectionModel::Filter => self.execute_probabilistic_command(
736 raw_query,
737 &ProbabilisticCommand::DropFilter {
738 name: query.name.clone(),
739 if_exists: query.if_exists,
740 },
741 ),
742 CollectionModel::Metrics => self.execute_drop_typed_collection(
743 raw_query,
744 &query.name,
745 query.if_exists,
746 CollectionModel::Metrics,
747 "metrics",
748 ),
749 CollectionModel::Mixed => self.execute_drop_typed_collection(
750 raw_query,
751 &query.name,
752 query.if_exists,
753 CollectionModel::Mixed,
754 "collection",
755 ),
756 }
757 }
758
759 pub fn execute_alter_table(
765 &self,
766 raw_query: &str,
767 query: &AlterTableQuery,
768 ) -> RedDBResult<RuntimeQueryResult> {
769 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
770 let store = self.inner.db.store();
771
772 if store.get_collection(&query.name).is_none() {
774 return Err(RedDBError::NotFound(format!(
775 "table '{}' not found",
776 query.name
777 )));
778 }
779
780 let mut messages = Vec::new();
781
782 let fields_added: Vec<String> = query
784 .operations
785 .iter()
786 .filter_map(|op| {
787 if let AlterOperation::AddColumn(col) = op {
788 Some(col.name.clone())
789 } else {
790 None
791 }
792 })
793 .collect();
794 let fields_removed: Vec<String> = query
795 .operations
796 .iter()
797 .filter_map(|op| {
798 if let AlterOperation::DropColumn(name) = op {
799 Some(name.clone())
800 } else {
801 None
802 }
803 })
804 .collect();
805
806 for op in &query.operations {
807 match op {
808 AlterOperation::AddColumn(col) => {
809 messages.push(format!("column '{}' added", col.name));
811 }
812 AlterOperation::DropColumn(name) => {
813 messages.push(format!("column '{}' dropped", name));
814 }
815 AlterOperation::RenameColumn { from, to } => {
816 messages.push(format!("column '{}' renamed to '{}'", from, to));
817 }
818 AlterOperation::AttachPartition { child, bound } => {
819 store.set_config_tree(
823 &format!("partition.{}.children.{}", query.name, child),
824 &crate::serde_json::Value::String(bound.clone()),
825 );
826 messages.push(format!(
827 "partition '{child}' attached to '{}' ({bound})",
828 query.name
829 ));
830 }
831 AlterOperation::DetachPartition { child } => {
832 store.set_config_tree(
833 &format!("partition.{}.children.{}", query.name, child),
834 &crate::serde_json::Value::Null,
835 );
836 messages.push(format!(
837 "partition '{child}' detached from '{}'",
838 query.name
839 ));
840 }
841 AlterOperation::EnableRowLevelSecurity => {
842 self.inner
843 .rls_enabled_tables
844 .write()
845 .insert(query.name.clone());
846 store.set_config_tree(
848 &format!("rls.enabled.{}", query.name),
849 &crate::serde_json::Value::Bool(true),
850 );
851 self.invalidate_plan_cache();
852 messages.push(format!("row level security enabled on '{}'", query.name));
853 }
854 AlterOperation::DisableRowLevelSecurity => {
855 self.inner.rls_enabled_tables.write().remove(&query.name);
856 store.set_config_tree(
857 &format!("rls.enabled.{}", query.name),
858 &crate::serde_json::Value::Null,
859 );
860 self.invalidate_plan_cache();
861 messages.push(format!("row level security disabled on '{}'", query.name));
862 }
863 AlterOperation::EnableTenancy { column } => {
865 store.set_config_tree(
866 &format!("tenant_tables.{}.column", query.name),
867 &crate::serde_json::Value::String(column.clone()),
868 );
869 self.register_tenant_table(&query.name, column);
870 self.invalidate_plan_cache();
871 messages.push(format!(
872 "tenancy enabled on '{}' by column '{column}'",
873 query.name
874 ));
875 }
876 AlterOperation::DisableTenancy => {
877 store.set_config_tree(
878 &format!("tenant_tables.{}.column", query.name),
879 &crate::serde_json::Value::Null,
880 );
881 self.unregister_tenant_table(&query.name);
882 self.invalidate_plan_cache();
883 messages.push(format!("tenancy disabled on '{}'", query.name));
884 }
885 AlterOperation::SetAppendOnly(on) => {
886 messages.push(format!(
891 "append_only {} on '{}'",
892 if *on { "enabled" } else { "disabled" },
893 query.name
894 ));
895 }
896 AlterOperation::SetVersioned(on) => {
897 self.vcs_set_versioned(&query.name, *on)?;
904 messages.push(format!(
905 "versioned {} on '{}'",
906 if *on { "enabled" } else { "disabled" },
907 query.name
908 ));
909 }
910 AlterOperation::EnableEvents(subscription) => {
911 let mut subscription = subscription.clone();
912 subscription.source = query.name.clone();
913 validate_event_subscriptions(
914 self,
915 &query.name,
916 std::slice::from_ref(&subscription),
917 )?;
918 ensure_event_target_queue(self, &subscription.target_queue)?;
919 messages.push(format!(
920 "events enabled on '{}' to '{}'",
921 query.name, subscription.target_queue
922 ));
923 }
924 AlterOperation::DisableEvents => {
925 messages.push(format!("events disabled on '{}'", query.name));
926 }
927 AlterOperation::AddSubscription { name, descriptor } => {
928 let mut sub = descriptor.clone();
929 sub.name = name.clone();
930 sub.source = query.name.clone();
931 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
932 ensure_event_target_queue(self, &sub.target_queue)?;
933 messages.push(format!(
934 "subscription '{}' added on '{}' to '{}'",
935 name, query.name, sub.target_queue
936 ));
937 }
938 AlterOperation::DropSubscription { name } => {
939 messages.push(format!(
940 "subscription '{}' dropped on '{}'",
941 name, query.name
942 ));
943 }
944 AlterOperation::AddSigner { pubkey } => {
945 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
952 return Err(RedDBError::Query(format!(
953 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
954 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
955 query.name
956 )));
957 }
958 let actor = crate::runtime::impl_core::current_user_projected()
959 .unwrap_or_else(|| "@system/alter".to_string());
960 let changed = crate::runtime::signed_writes_kind::add_signer(
961 &store,
962 &query.name,
963 *pubkey,
964 &actor,
965 );
966 messages.push(format!(
967 "signer {} on '{}'",
968 if changed { "added" } else { "already present" },
969 query.name
970 ));
971 }
972 AlterOperation::RevokeSigner { pubkey } => {
973 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
974 return Err(RedDBError::Query(format!(
975 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
976 query.name
977 )));
978 }
979 let actor = crate::runtime::impl_core::current_user_projected()
980 .unwrap_or_else(|| "@system/alter".to_string());
981 let changed = crate::runtime::signed_writes_kind::revoke_signer(
982 &store,
983 &query.name,
984 pubkey,
985 &actor,
986 );
987 messages.push(format!(
988 "signer {} on '{}'",
989 if changed {
990 "revoked"
991 } else {
992 "already revoked"
993 },
994 query.name
995 ));
996 }
997 AlterOperation::SetRetention { duration_ms } => {
998 let existing = self.inner.db.collection_contract(&query.name);
1004 let has_ts_column = existing
1005 .as_ref()
1006 .map(retention_timestamp_column_exists)
1007 .unwrap_or(false);
1008 if !has_ts_column {
1009 return Err(RedDBError::Query(format!(
1010 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1011 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1012 or enable WITH timestamps = true before setting a \
1013 retention policy",
1014 query.name
1015 )));
1016 }
1017 messages.push(format!(
1018 "retention set to {duration_ms} ms on '{}'",
1019 query.name
1020 ));
1021 }
1022 AlterOperation::UnsetRetention => {
1023 messages.push(format!("retention cleared on '{}'", query.name));
1024 }
1025 }
1026 }
1027
1028 let mut contract = self
1029 .inner
1030 .db
1031 .collection_contract(&query.name)
1032 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1033 apply_alter_operations_to_contract(&mut contract, &query.operations);
1034 contract.version = contract.version.saturating_add(1);
1035 contract.updated_at_unix_ms = current_unix_ms();
1036 self.inner
1037 .db
1038 .save_collection_contract(contract)
1039 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1040 if !fields_added.is_empty() || !fields_removed.is_empty() {
1044 let sub_names: Vec<String> = self
1045 .inner
1046 .db
1047 .collection_contract(&query.name)
1048 .map(|c| {
1049 c.subscriptions
1050 .iter()
1051 .filter(|s| s.enabled)
1052 .map(|s| s.name.clone())
1053 .collect()
1054 })
1055 .unwrap_or_default();
1056 if !sub_names.is_empty() {
1057 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1058 collection: query.name.clone(),
1059 subscription_names: sub_names.join(", "),
1060 fields_added: fields_added.join(", "),
1061 fields_removed: fields_removed.join(", "),
1062 lsn: self.cdc_current_lsn(),
1063 }
1064 .emit_global();
1065 }
1066 }
1067
1068 self.clear_table_planner_stats(&query.name);
1069 self.invalidate_result_cache();
1070 let post_alter_columns: Vec<String> = self
1075 .inner
1076 .db
1077 .collection_contract(&query.name)
1078 .map(|contract| {
1079 contract
1080 .declared_columns
1081 .iter()
1082 .map(|col| col.name.clone())
1083 .collect()
1084 })
1085 .unwrap_or_default();
1086 self.schema_vocabulary_apply(
1087 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1088 collection: query.name.clone(),
1089 columns: post_alter_columns,
1090 type_tags: Vec::new(),
1091 description: None,
1092 },
1093 );
1094
1095 let message = if messages.is_empty() {
1096 format!("table '{}' altered (no operations)", query.name)
1097 } else {
1098 format!("table '{}' altered: {}", query.name, messages.join(", "))
1099 };
1100
1101 Ok(RuntimeQueryResult::ok_message(
1102 raw_query.to_string(),
1103 &message,
1104 "alter",
1105 ))
1106 }
1107
1108 pub fn execute_explain_alter(
1115 &self,
1116 raw_query: &str,
1117 query: &ExplainAlterQuery,
1118 ) -> RedDBResult<RuntimeQueryResult> {
1119 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1123
1124 let current_contract = self.inner.db.collection_contract(&query.target.name);
1125
1126 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1127 .as_ref()
1128 .map(|c| c.declared_columns.clone())
1129 .unwrap_or_default();
1130
1131 let diff = super::schema_diff::compute_column_diff(
1132 &query.target.name,
1133 ¤t_columns,
1134 &query.target.columns,
1135 );
1136
1137 let rendered = match query.format {
1138 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1139 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1140 };
1141
1142 let format_label = match query.format {
1143 ExplainFormat::Sql => "sql",
1144 ExplainFormat::Json => "json",
1145 };
1146
1147 let columns = vec![
1148 "table".to_string(),
1149 "format".to_string(),
1150 "diff".to_string(),
1151 ];
1152 let row = vec![
1153 ("table".to_string(), Value::text(query.target.name.clone())),
1154 ("format".to_string(), Value::text(format_label.to_string())),
1155 ("diff".to_string(), Value::text(rendered)),
1156 ];
1157
1158 Ok(RuntimeQueryResult::ok_records(
1159 raw_query.to_string(),
1160 columns,
1161 vec![row],
1162 "explain",
1163 ))
1164 }
1165
1166 pub fn execute_create_index(
1171 &self,
1172 raw_query: &str,
1173 query: &CreateIndexQuery,
1174 ) -> RedDBResult<RuntimeQueryResult> {
1175 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1176 let store = self.inner.db.store();
1177
1178 let manager = store
1180 .get_collection(&query.table)
1181 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1182
1183 let method_kind = match query.method {
1184 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1185 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1186 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1187 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1188 };
1189
1190 let entities = manager.query_all(|_| true);
1200 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1201 entities
1202 .iter()
1203 .map(|e| {
1204 let fields = match &e.data {
1205 crate::storage::EntityData::Row(row) => {
1206 if let Some(ref named) = row.named {
1207 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1208 } else if let Some(ref schema) = row.schema {
1209 schema
1213 .iter()
1214 .zip(row.columns.iter())
1215 .map(|(k, v)| (k.clone(), v.clone()))
1216 .collect()
1217 } else {
1218 Vec::new()
1219 }
1220 }
1221 crate::storage::EntityData::Node(node) => node
1222 .properties
1223 .iter()
1224 .map(|(k, v)| (k.clone(), v.clone()))
1225 .collect(),
1226 _ => Vec::new(),
1227 };
1228 (e.id, fields)
1229 })
1230 .collect();
1231
1232 let indexed_count = self
1234 .inner
1235 .index_store
1236 .create_index(
1237 &query.name,
1238 &query.table,
1239 &query.columns,
1240 method_kind,
1241 query.unique,
1242 &entity_fields,
1243 )
1244 .map_err(RedDBError::Internal)?;
1245
1246 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1247 &query.table,
1248 &entity_fields,
1249 );
1250 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1251 self.invalidate_plan_cache();
1252
1253 self.inner
1255 .index_store
1256 .register(super::index_store::RegisteredIndex {
1257 name: query.name.clone(),
1258 collection: query.table.clone(),
1259 columns: query.columns.clone(),
1260 method: method_kind,
1261 unique: query.unique,
1262 });
1263 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1267 collection: query.table.clone(),
1268 index: query.name.clone(),
1269 columns: query.columns.clone(),
1270 });
1271
1272 let method_str = format!("{}", query.method);
1273 let unique_str = if query.unique { "unique " } else { "" };
1274 let cols = query.columns.join(", ");
1275
1276 Ok(RuntimeQueryResult::ok_message(
1277 raw_query.to_string(),
1278 &format!(
1279 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1280 unique_str, query.name, query.table, cols, method_str, indexed_count
1281 ),
1282 "create",
1283 ))
1284 }
1285
1286 pub fn execute_drop_index(
1290 &self,
1291 raw_query: &str,
1292 query: &DropIndexQuery,
1293 ) -> RedDBResult<RuntimeQueryResult> {
1294 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1295 let store = self.inner.db.store();
1296
1297 if store.get_collection(&query.table).is_none() {
1299 if query.if_exists {
1300 return Ok(RuntimeQueryResult::ok_message(
1301 raw_query.to_string(),
1302 &format!("table '{}' does not exist", query.table),
1303 "drop",
1304 ));
1305 }
1306 return Err(RedDBError::NotFound(format!(
1307 "table '{}' not found",
1308 query.table
1309 )));
1310 }
1311
1312 self.inner.index_store.drop_index(&query.name, &query.table);
1314 self.invalidate_plan_cache();
1315 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1317 collection: query.table.clone(),
1318 index: query.name.clone(),
1319 });
1320
1321 Ok(RuntimeQueryResult::ok_message(
1322 raw_query.to_string(),
1323 &format!("index '{}' dropped from '{}'", query.name, query.table),
1324 "drop",
1325 ))
1326 }
1327
1328 fn execute_drop_typed_collection(
1329 &self,
1330 raw_query: &str,
1331 name: &str,
1332 if_exists: bool,
1333 expected_model: CollectionModel,
1334 label: &str,
1335 ) -> RedDBResult<RuntimeQueryResult> {
1336 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1337 if is_system_schema_name(name) {
1338 return Err(RedDBError::Query("system schema is read-only".to_string()));
1339 }
1340 let store = self.inner.db.store();
1341 if store.get_collection(name).is_none() {
1342 if if_exists {
1343 return Ok(RuntimeQueryResult::ok_message(
1344 raw_query.to_string(),
1345 &format!("{label} '{name}' does not exist"),
1346 "drop",
1347 ));
1348 }
1349 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1350 }
1351
1352 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1353 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1354 self.drop_collection_storage(raw_query, name, label)
1355 }
1356
1357 pub fn execute_truncate(
1358 &self,
1359 raw_query: &str,
1360 query: &TruncateQuery,
1361 ) -> RedDBResult<RuntimeQueryResult> {
1362 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1363 if is_system_schema_name(&query.name) {
1364 return Err(RedDBError::Query("system schema is read-only".to_string()));
1365 }
1366
1367 let label = query
1368 .model
1369 .map(polymorphic_resolver::model_name)
1370 .unwrap_or("collection");
1371 let store = self.inner.db.store();
1372 if store.get_collection(&query.name).is_none() {
1373 if query.if_exists {
1374 return Ok(RuntimeQueryResult::ok_message(
1375 raw_query.to_string(),
1376 &format!("{label} '{}' does not exist", query.name),
1377 "truncate",
1378 ));
1379 }
1380 return Err(RedDBError::NotFound(format!(
1381 "{label} '{}' not found",
1382 query.name
1383 )));
1384 }
1385
1386 let actual =
1387 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1388 if let Some(expected) = query.model {
1389 polymorphic_resolver::ensure_model_match(expected, actual)?;
1390 }
1391
1392 if actual == CollectionModel::Queue {
1393 return self.execute_queue_command(
1394 raw_query,
1395 &QueueCommand::Purge {
1396 queue: query.name.clone(),
1397 },
1398 );
1399 }
1400
1401 let affected = self.truncate_collection_entities(&query.name)?;
1403 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1405 self.inner.db.invalidate_vector_index(&query.name);
1406 self.clear_table_planner_stats(&query.name);
1407 self.invalidate_result_cache();
1408
1409 Ok(RuntimeQueryResult::ok_message(
1410 raw_query.to_string(),
1411 &format!(
1412 "{affected} entities truncated from {label} '{}'",
1413 query.name
1414 ),
1415 "truncate",
1416 ))
1417 }
1418
1419 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1420 let store = self.inner.db.store();
1421 let Some(manager) = store.get_collection(name) else {
1422 return Ok(0);
1423 };
1424 let entities = manager.query_all(|_| true);
1425 if entities.is_empty() {
1426 return Ok(0);
1427 }
1428
1429 for entity in &entities {
1430 let fields = entity_index_fields(&entity.data);
1431 self.inner
1432 .index_store
1433 .index_entity_delete(name, entity.id, &fields)
1434 .map_err(RedDBError::Internal)?;
1435 }
1436
1437 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1438 let deleted_ids = store
1439 .delete_batch(name, &ids)
1440 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1441 for id in &deleted_ids {
1442 store.context_index().remove_entity(*id);
1443 }
1444 Ok(deleted_ids.len() as u64)
1445 }
1446
1447 fn drop_collection_storage(
1448 &self,
1449 raw_query: &str,
1450 name: &str,
1451 label: &str,
1452 ) -> RedDBResult<RuntimeQueryResult> {
1453 let store = self.inner.db.store();
1454
1455 let final_count = store
1458 .get_collection(name)
1459 .map(|manager| manager.query_all(|_| true).len() as u64)
1460 .unwrap_or(0);
1461 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1462 self,
1463 name,
1464 final_count,
1465 )?;
1466
1467 let orphaned_indices: Vec<String> = self
1468 .inner
1469 .index_store
1470 .list_indices(name)
1471 .into_iter()
1472 .map(|index| index.name)
1473 .collect();
1474 for index_name in &orphaned_indices {
1475 self.inner.index_store.drop_index(index_name, name);
1476 }
1477
1478 store
1479 .drop_collection(name)
1480 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1481 self.inner.db.invalidate_vector_index(name);
1482 self.inner.db.clear_collection_default_ttl_ms(name);
1483 self.inner
1484 .db
1485 .remove_collection_contract(name)
1486 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1487 self.clear_table_planner_stats(name);
1488 self.invalidate_result_cache();
1489 if let Some(store) = self.inner.auth_store.read().clone() {
1490 store.invalidate_visible_collections_cache();
1491 }
1492 self.inner
1493 .db
1494 .persist_metadata()
1495 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1496 self.schema_vocabulary_apply(
1497 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1498 collection: name.to_string(),
1499 },
1500 );
1501
1502 Ok(RuntimeQueryResult::ok_message(
1503 raw_query.to_string(),
1504 &format!("{label} '{name}' dropped"),
1505 "drop",
1506 ))
1507 }
1508}
1509
1510pub(crate) fn is_system_schema_name(name: &str) -> bool {
1511 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1512}
1513
1514fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1515 match data {
1516 EntityData::Row(row) => {
1517 if let Some(ref named) = row.named {
1518 named
1519 .iter()
1520 .map(|(key, value)| (key.clone(), value.clone()))
1521 .collect()
1522 } else if let Some(ref schema) = row.schema {
1523 schema
1524 .iter()
1525 .zip(row.columns.iter())
1526 .map(|(key, value)| (key.clone(), value.clone()))
1527 .collect()
1528 } else {
1529 Vec::new()
1530 }
1531 }
1532 EntityData::Node(node) => node
1533 .properties
1534 .iter()
1535 .map(|(key, value)| (key.clone(), value.clone()))
1536 .collect(),
1537 _ => Vec::new(),
1538 }
1539}
1540
1541fn collection_contract_from_create_table(
1542 query: &CreateTableQuery,
1543) -> RedDBResult<crate::physical::CollectionContract> {
1544 let now = current_unix_ms();
1545 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1546 .columns
1547 .iter()
1548 .map(declared_column_contract_from_ddl)
1549 .collect();
1550 if query.timestamps {
1551 declared_columns.push(crate::physical::DeclaredColumnContract {
1555 name: "created_at".to_string(),
1556 data_type: "BIGINT".to_string(),
1557 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1558 not_null: true,
1559 default: None,
1560 compress: None,
1561 unique: false,
1562 primary_key: false,
1563 enum_variants: Vec::new(),
1564 array_element: None,
1565 decimal_precision: None,
1566 });
1567 declared_columns.push(crate::physical::DeclaredColumnContract {
1568 name: "updated_at".to_string(),
1569 data_type: "BIGINT".to_string(),
1570 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1571 not_null: true,
1572 default: None,
1573 compress: None,
1574 unique: false,
1575 primary_key: false,
1576 enum_variants: Vec::new(),
1577 array_element: None,
1578 decimal_precision: None,
1579 });
1580 }
1581 Ok(crate::physical::CollectionContract {
1582 name: query.name.clone(),
1583 declared_model: crate::catalog::CollectionModel::Table,
1584 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1585 origin: crate::physical::ContractOrigin::Explicit,
1586 version: 1,
1587 created_at_unix_ms: now,
1588 updated_at_unix_ms: now,
1589 default_ttl_ms: query.default_ttl_ms,
1590 vector_dimension: None,
1591 vector_metric: None,
1592 context_index_fields: query.context_index_fields.clone(),
1593 declared_columns,
1594 table_def: Some(build_table_def_from_create_table(query)?),
1595 timestamps_enabled: query.timestamps,
1596 context_index_enabled: query.context_index_enabled
1597 || !query.context_index_fields.is_empty(),
1598 metrics_raw_retention_ms: None,
1599 metrics_rollup_policies: Vec::new(),
1600 metrics_tenant_identity: None,
1601 metrics_namespace: None,
1602 append_only: query.append_only,
1603 subscriptions: query.subscriptions.clone(),
1604 session_key: None,
1605 session_gap_ms: None,
1606 retention_duration_ms: None,
1607 })
1608}
1609
1610fn default_collection_contract_for_existing_table(
1611 name: &str,
1612) -> crate::physical::CollectionContract {
1613 let now = current_unix_ms();
1614 crate::physical::CollectionContract {
1615 name: name.to_string(),
1616 declared_model: crate::catalog::CollectionModel::Table,
1617 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1618 origin: crate::physical::ContractOrigin::Explicit,
1619 version: 0,
1620 created_at_unix_ms: now,
1621 updated_at_unix_ms: now,
1622 default_ttl_ms: None,
1623 vector_dimension: None,
1624 vector_metric: None,
1625 context_index_fields: Vec::new(),
1626 declared_columns: Vec::new(),
1627 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1628 timestamps_enabled: false,
1629 context_index_enabled: false,
1630 metrics_raw_retention_ms: None,
1631 metrics_rollup_policies: Vec::new(),
1632 metrics_tenant_identity: None,
1633 metrics_namespace: None,
1634 append_only: false,
1635 subscriptions: Vec::new(),
1636 session_key: None,
1637 session_gap_ms: None,
1638 retention_duration_ms: None,
1639 }
1640}
1641
1642fn keyed_collection_contract(
1643 name: &str,
1644 model: crate::catalog::CollectionModel,
1645) -> crate::physical::CollectionContract {
1646 let now = current_unix_ms();
1647 crate::physical::CollectionContract {
1648 name: name.to_string(),
1649 declared_model: model,
1650 schema_mode: crate::catalog::SchemaMode::Dynamic,
1651 origin: crate::physical::ContractOrigin::Explicit,
1652 version: 1,
1653 created_at_unix_ms: now,
1654 updated_at_unix_ms: now,
1655 default_ttl_ms: None,
1656 vector_dimension: None,
1657 vector_metric: None,
1658 context_index_fields: Vec::new(),
1659 declared_columns: Vec::new(),
1660 table_def: None,
1661 timestamps_enabled: false,
1662 context_index_enabled: false,
1663 metrics_raw_retention_ms: None,
1664 metrics_rollup_policies: Vec::new(),
1665 metrics_tenant_identity: None,
1666 metrics_namespace: None,
1667 append_only: false,
1668 subscriptions: Vec::new(),
1669 session_key: None,
1670 session_gap_ms: None,
1671 retention_duration_ms: None,
1672 }
1673}
1674
1675fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1676 let now = current_unix_ms();
1677 crate::physical::CollectionContract {
1678 name: query.name.clone(),
1679 declared_model: crate::catalog::CollectionModel::Metrics,
1680 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1681 origin: crate::physical::ContractOrigin::Explicit,
1682 version: 1,
1683 created_at_unix_ms: now,
1684 updated_at_unix_ms: now,
1685 default_ttl_ms: query.default_ttl_ms,
1686 vector_dimension: None,
1687 vector_metric: None,
1688 context_index_fields: Vec::new(),
1689 declared_columns: Vec::new(),
1690 table_def: None,
1691 timestamps_enabled: false,
1692 context_index_enabled: false,
1693 metrics_raw_retention_ms: query.default_ttl_ms,
1694 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1695 metrics_tenant_identity: Some(
1696 query
1697 .tenant_by
1698 .clone()
1699 .unwrap_or_else(|| "current_tenant".to_string()),
1700 ),
1701 metrics_namespace: Some("default".to_string()),
1702 append_only: true,
1703 subscriptions: Vec::new(),
1704 session_key: None,
1705 session_gap_ms: None,
1706 retention_duration_ms: None,
1707 }
1708}
1709
1710fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1711 let now = current_unix_ms();
1712 crate::physical::CollectionContract {
1713 name: query.name.clone(),
1714 declared_model: crate::catalog::CollectionModel::Vector,
1715 schema_mode: crate::catalog::SchemaMode::Dynamic,
1716 origin: crate::physical::ContractOrigin::Explicit,
1717 version: 1,
1718 created_at_unix_ms: now,
1719 updated_at_unix_ms: now,
1720 default_ttl_ms: None,
1721 vector_dimension: Some(query.dimension),
1722 vector_metric: Some(query.metric),
1723 context_index_fields: Vec::new(),
1724 declared_columns: Vec::new(),
1725 table_def: None,
1726 timestamps_enabled: false,
1727 context_index_enabled: false,
1728 metrics_raw_retention_ms: None,
1729 metrics_rollup_policies: Vec::new(),
1730 metrics_tenant_identity: None,
1731 metrics_namespace: None,
1732 append_only: false,
1733 subscriptions: Vec::new(),
1734 session_key: None,
1735 session_gap_ms: None,
1736 retention_duration_ms: None,
1737 }
1738}
1739
1740fn declared_column_contract_from_ddl(
1741 column: &CreateColumnDef,
1742) -> crate::physical::DeclaredColumnContract {
1743 crate::physical::DeclaredColumnContract {
1744 name: column.name.clone(),
1745 data_type: column.data_type.clone(),
1746 sql_type: Some(column.sql_type.clone()),
1747 not_null: column.not_null,
1748 default: column.default.clone(),
1749 compress: column.compress,
1750 unique: column.unique,
1751 primary_key: column.primary_key,
1752 enum_variants: column.enum_variants.clone(),
1753 array_element: column.array_element.clone(),
1754 decimal_precision: column.decimal_precision,
1755 }
1756}
1757
1758fn apply_alter_operations_to_contract(
1759 contract: &mut crate::physical::CollectionContract,
1760 operations: &[AlterOperation],
1761) {
1762 if contract.table_def.is_none() {
1763 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1764 }
1765 for operation in operations {
1766 match operation {
1767 AlterOperation::AddColumn(column) => {
1768 if !contract
1769 .declared_columns
1770 .iter()
1771 .any(|existing| existing.name == column.name)
1772 {
1773 contract
1774 .declared_columns
1775 .push(declared_column_contract_from_ddl(column));
1776 }
1777 if let Some(table_def) = contract.table_def.as_mut() {
1778 if table_def.get_column(&column.name).is_none() {
1779 if let Ok(column_def) = column_def_from_ddl(column) {
1780 if column.primary_key {
1781 table_def.primary_key.push(column.name.clone());
1782 table_def.constraints.push(
1783 crate::storage::schema::Constraint::new(
1784 format!("pk_{}", column.name),
1785 crate::storage::schema::ConstraintType::PrimaryKey,
1786 )
1787 .on_columns(vec![column.name.clone()]),
1788 );
1789 }
1790 if column.unique {
1791 table_def.constraints.push(
1792 crate::storage::schema::Constraint::new(
1793 format!("uniq_{}", column.name),
1794 crate::storage::schema::ConstraintType::Unique,
1795 )
1796 .on_columns(vec![column.name.clone()]),
1797 );
1798 }
1799 if column.not_null {
1800 table_def.constraints.push(
1801 crate::storage::schema::Constraint::new(
1802 format!("not_null_{}", column.name),
1803 crate::storage::schema::ConstraintType::NotNull,
1804 )
1805 .on_columns(vec![column.name.clone()]),
1806 );
1807 }
1808 table_def.columns.push(column_def);
1809 }
1810 }
1811 }
1812 }
1813 AlterOperation::DropColumn(name) => {
1814 contract
1815 .declared_columns
1816 .retain(|column| column.name != *name);
1817 if let Some(table_def) = contract.table_def.as_mut() {
1818 if let Some(index) = table_def.column_index(name) {
1819 table_def.columns.remove(index);
1820 }
1821 table_def.primary_key.retain(|column| column != name);
1822 table_def.constraints.retain(|constraint| {
1823 !constraint.columns.iter().any(|column| column == name)
1824 });
1825 table_def
1826 .indexes
1827 .retain(|index| !index.columns.iter().any(|column| column == name));
1828 }
1829 }
1830 AlterOperation::RenameColumn { from, to } => {
1831 if contract
1832 .declared_columns
1833 .iter()
1834 .any(|column| column.name == *to)
1835 {
1836 continue;
1837 }
1838 if let Some(column) = contract
1839 .declared_columns
1840 .iter_mut()
1841 .find(|column| column.name == *from)
1842 {
1843 column.name = to.clone();
1844 }
1845 if let Some(table_def) = contract.table_def.as_mut() {
1846 if let Some(column) = table_def
1847 .columns
1848 .iter_mut()
1849 .find(|column| column.name == *from)
1850 {
1851 column.name = to.clone();
1852 }
1853 for primary_key in &mut table_def.primary_key {
1854 if *primary_key == *from {
1855 *primary_key = to.clone();
1856 }
1857 }
1858 for constraint in &mut table_def.constraints {
1859 for column in &mut constraint.columns {
1860 if *column == *from {
1861 *column = to.clone();
1862 }
1863 }
1864 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1865 for column in ref_columns {
1866 if *column == *from {
1867 *column = to.clone();
1868 }
1869 }
1870 }
1871 }
1872 for index in &mut table_def.indexes {
1873 for column in &mut index.columns {
1874 if *column == *from {
1875 *column = to.clone();
1876 }
1877 }
1878 }
1879 }
1880 }
1881 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1884 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1888 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1891 AlterOperation::SetAppendOnly(on) => {
1892 contract.append_only = *on;
1893 }
1894 AlterOperation::SetVersioned(_) => {}
1897 AlterOperation::EnableEvents(subscription) => {
1898 let mut subscription = subscription.clone();
1899 subscription.source = contract.name.clone();
1900 subscription.enabled = true;
1901 if let Some(existing) = contract
1902 .subscriptions
1903 .iter_mut()
1904 .find(|existing| existing.target_queue == subscription.target_queue)
1905 {
1906 *existing = subscription;
1907 } else {
1908 contract.subscriptions.push(subscription);
1909 }
1910 }
1911 AlterOperation::DisableEvents => {
1912 for subscription in &mut contract.subscriptions {
1913 subscription.enabled = false;
1914 }
1915 }
1916 AlterOperation::AddSubscription { name, descriptor } => {
1917 let mut sub = descriptor.clone();
1918 sub.name = name.clone();
1919 sub.source = contract.name.clone();
1920 sub.enabled = true;
1921 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1922 {
1923 *existing = sub;
1924 } else {
1925 contract.subscriptions.push(sub);
1926 }
1927 }
1928 AlterOperation::DropSubscription { name } => {
1929 contract.subscriptions.retain(|s| s.name != *name);
1930 }
1931 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1936 AlterOperation::SetRetention { duration_ms } => {
1937 contract.retention_duration_ms = Some(*duration_ms);
1938 }
1939 AlterOperation::UnsetRetention => {
1940 contract.retention_duration_ms = None;
1941 }
1942 }
1943 }
1944}
1945
1946pub(crate) fn retention_timestamp_column_exists(
1951 contract: &crate::physical::CollectionContract,
1952) -> bool {
1953 if contract.timestamps_enabled {
1954 return true;
1955 }
1956 if matches!(
1957 contract.declared_model,
1958 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
1959 ) {
1960 return true;
1964 }
1965 contract
1966 .declared_columns
1967 .iter()
1968 .any(|column| is_temporal_data_type(&column.data_type))
1969}
1970
1971fn is_temporal_data_type(data_type: &str) -> bool {
1972 let upper = data_type.to_ascii_uppercase();
1973 matches!(
1974 upper.as_str(),
1975 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
1976 )
1977}
1978
1979fn validate_event_subscriptions(
1980 runtime: &RedDBRuntime,
1981 source: &str,
1982 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1983) -> RedDBResult<()> {
1984 for subscription in subscriptions
1985 .iter()
1986 .filter(|subscription| subscription.enabled)
1987 {
1988 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1989 return Err(RedDBError::Query(
1990 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1991 ));
1992 }
1993 validate_subscription_auth(runtime, source, subscription)?;
1994 if subscription.target_queue == source
1995 || subscription_would_create_cycle(
1996 &runtime.inner.db,
1997 source,
1998 &subscription.target_queue,
1999 )
2000 {
2001 return Err(RedDBError::Query(
2002 "subscription would create cycle".to_string(),
2003 ));
2004 }
2005 audit_subscription_redact_gap(runtime, source, subscription);
2006 }
2007 Ok(())
2008}
2009
2010fn validate_subscription_auth(
2011 runtime: &RedDBRuntime,
2012 source: &str,
2013 subscription: &crate::catalog::SubscriptionDescriptor,
2014) -> RedDBResult<()> {
2015 let auth_store = match runtime.inner.auth_store.read().clone() {
2016 Some(store) => store,
2017 None => return Ok(()),
2018 };
2019 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2020 Some(identity) => identity,
2021 None => return Ok(()),
2022 };
2023 let tenant = crate::runtime::impl_core::current_tenant();
2024 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2025
2026 if auth_store.iam_authorization_enabled() {
2027 let ctx = crate::auth::policies::EvalContext {
2028 principal_tenant: tenant.clone(),
2029 current_tenant: tenant.clone(),
2030 peer_ip: None,
2031 mfa_present: false,
2032 now_ms: crate::auth::now_ms(),
2033 principal_is_admin_role: role == crate::auth::Role::Admin,
2034 };
2035 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2036 if let Some(t) = tenant.as_deref() {
2037 source_resource = source_resource.with_tenant(t.to_string());
2038 }
2039 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
2040 return Err(RedDBError::Query(format!(
2041 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2042 principal, source_resource.kind, source_resource.name
2043 )));
2044 }
2045
2046 let mut target_resource =
2047 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2048 if let Some(t) = tenant.as_deref() {
2049 target_resource = target_resource.with_tenant(t.to_string());
2050 }
2051 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
2052 return Err(RedDBError::Query(format!(
2053 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2054 principal, target_resource.kind, target_resource.name
2055 )));
2056 }
2057 return Ok(());
2058 }
2059
2060 let ctx = crate::auth::privileges::AuthzContext {
2061 principal: &username,
2062 effective_role: role,
2063 tenant: tenant.as_deref(),
2064 };
2065 auth_store
2066 .check_grant(
2067 &ctx,
2068 crate::auth::privileges::Action::Select,
2069 &crate::auth::privileges::Resource::table_from_name(source),
2070 )
2071 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2072 auth_store
2073 .check_grant(
2074 &ctx,
2075 crate::auth::privileges::Action::Insert,
2076 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2077 )
2078 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2079 Ok(())
2080}
2081
2082fn audit_subscription_redact_gap(
2083 runtime: &RedDBRuntime,
2084 source: &str,
2085 subscription: &crate::catalog::SubscriptionDescriptor,
2086) {
2087 let auth_store = match runtime.inner.auth_store.read().clone() {
2088 Some(store) if store.iam_authorization_enabled() => store,
2089 _ => return,
2090 };
2091 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2092 Some(identity) => identity,
2093 None => return,
2094 };
2095 let tenant = crate::runtime::impl_core::current_tenant();
2096 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2097 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2098 if missing.is_empty() {
2099 return;
2100 }
2101
2102 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2103 tracing::warn!(
2104 target: "reddb::operator",
2105 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2106 source,
2107 subscription.target_queue,
2108 columns
2109 );
2110 let mut event = AuditEvent::builder("subscription_redact_gap")
2111 .principal(username)
2112 .source(AuditAuthSource::System)
2113 .resource(format!(
2114 "subscription:{}->{}",
2115 source, subscription.target_queue
2116 ))
2117 .outcome(Outcome::Success)
2118 .field(AuditFieldEscaper::field("source", source))
2119 .field(AuditFieldEscaper::field(
2120 "target_queue",
2121 subscription.target_queue.clone(),
2122 ))
2123 .field(AuditFieldEscaper::field(
2124 "subscription",
2125 subscription.name.clone(),
2126 ))
2127 .field(AuditFieldEscaper::field("columns", columns))
2128 .field(AuditFieldEscaper::field("role", role.as_str()));
2129 if let Some(t) = tenant {
2130 event = event.tenant(t);
2131 }
2132 runtime.inner.audit_log.record_event(event.build());
2133}
2134
2135fn subscription_redact_gap_columns(
2136 auth_store: &crate::auth::store::AuthStore,
2137 principal: &crate::auth::UserId,
2138 source: &str,
2139 subscription: &crate::catalog::SubscriptionDescriptor,
2140) -> BTreeSet<String> {
2141 let redacted: HashSet<String> = subscription
2142 .redact_fields
2143 .iter()
2144 .map(|field| field.to_ascii_lowercase())
2145 .collect();
2146 auth_store
2147 .effective_policies(principal)
2148 .iter()
2149 .flat_map(|policy| policy.statements.iter())
2150 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2151 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2152 .flat_map(|statement| statement.resources.iter())
2153 .filter_map(|resource| denied_column_for_source(resource, source))
2154 .filter(|column| !redact_covers_column(&redacted, source, column))
2155 .collect()
2156}
2157
2158fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2159 match pattern {
2160 crate::auth::policies::ActionPattern::Wildcard => true,
2161 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2162 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2163 "select".len() > prefix.len() + 1
2164 && "select".starts_with(prefix)
2165 && "select".as_bytes()[prefix.len()] == b':'
2166 }
2167 }
2168}
2169
2170fn denied_column_for_source(
2171 resource: &crate::auth::policies::ResourcePattern,
2172 source: &str,
2173) -> Option<String> {
2174 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2175 return None;
2176 };
2177 if kind != "column" {
2178 return None;
2179 }
2180 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2181 (column.table_resource_name() == source).then_some(column.column)
2182}
2183
2184fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2185 let column = column.to_ascii_lowercase();
2186 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2187 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2188}
2189
2190fn subscription_would_create_cycle(
2191 db: &crate::storage::unified::devx::RedDB,
2192 source: &str,
2193 target: &str,
2194) -> bool {
2195 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2196 for contract in db.collection_contracts() {
2197 for subscription in contract
2198 .subscriptions
2199 .into_iter()
2200 .filter(|subscription| subscription.enabled)
2201 {
2202 graph
2203 .entry(subscription.source)
2204 .or_default()
2205 .push(subscription.target_queue);
2206 }
2207 }
2208 graph
2209 .entry(source.to_string())
2210 .or_default()
2211 .push(target.to_string());
2212
2213 let mut stack = vec![target.to_string()];
2214 let mut seen = HashSet::new();
2215 while let Some(node) = stack.pop() {
2216 if node == source {
2217 return true;
2218 }
2219 if !seen.insert(node.clone()) {
2220 continue;
2221 }
2222 if let Some(next) = graph.get(&node) {
2223 stack.extend(next.iter().cloned());
2224 }
2225 }
2226 false
2227}
2228
2229pub(crate) fn ensure_event_target_queue_pub(
2230 runtime: &RedDBRuntime,
2231 queue: &str,
2232) -> RedDBResult<()> {
2233 ensure_event_target_queue(runtime, queue)
2234}
2235
2236fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2237 let store = runtime.inner.db.store();
2238 if store.get_collection(queue).is_some() {
2239 return Ok(());
2240 }
2241 store
2242 .create_collection(queue)
2243 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2244 runtime
2245 .inner
2246 .db
2247 .save_collection_contract(event_queue_collection_contract(queue))
2248 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2249 store.set_config_tree(
2250 &format!("queue.{queue}.mode"),
2251 &crate::serde_json::Value::String("fanout".to_string()),
2252 );
2253 Ok(())
2254}
2255
2256fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2257 let now = current_unix_ms();
2258 crate::physical::CollectionContract {
2259 name: queue.to_string(),
2260 declared_model: crate::catalog::CollectionModel::Queue,
2261 schema_mode: crate::catalog::SchemaMode::Dynamic,
2262 origin: crate::physical::ContractOrigin::Implicit,
2263 version: 1,
2264 created_at_unix_ms: now,
2265 updated_at_unix_ms: now,
2266 default_ttl_ms: None,
2267 vector_dimension: None,
2268 vector_metric: None,
2269 context_index_fields: Vec::new(),
2270 declared_columns: Vec::new(),
2271 table_def: None,
2272 timestamps_enabled: false,
2273 context_index_enabled: false,
2274 metrics_raw_retention_ms: None,
2275 metrics_rollup_policies: Vec::new(),
2276 metrics_tenant_identity: None,
2277 metrics_namespace: None,
2278 append_only: true,
2279 subscriptions: Vec::new(),
2280 session_key: None,
2281 session_gap_ms: None,
2282 retention_duration_ms: None,
2283 }
2284}
2285
2286fn build_table_def_from_create_table(
2287 query: &CreateTableQuery,
2288) -> RedDBResult<crate::storage::schema::TableDef> {
2289 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2290 for column in &query.columns {
2291 if column.primary_key {
2292 table.primary_key.push(column.name.clone());
2293 table.constraints.push(
2294 crate::storage::schema::Constraint::new(
2295 format!("pk_{}", column.name),
2296 crate::storage::schema::ConstraintType::PrimaryKey,
2297 )
2298 .on_columns(vec![column.name.clone()]),
2299 );
2300 }
2301 if column.unique {
2302 table.constraints.push(
2303 crate::storage::schema::Constraint::new(
2304 format!("uniq_{}", column.name),
2305 crate::storage::schema::ConstraintType::Unique,
2306 )
2307 .on_columns(vec![column.name.clone()]),
2308 );
2309 }
2310 if column.not_null {
2311 table.constraints.push(
2312 crate::storage::schema::Constraint::new(
2313 format!("not_null_{}", column.name),
2314 crate::storage::schema::ConstraintType::NotNull,
2315 )
2316 .on_columns(vec![column.name.clone()]),
2317 );
2318 }
2319 table.columns.push(column_def_from_ddl(column)?);
2320 }
2321 if query.timestamps {
2326 table.columns.push(
2327 crate::storage::schema::ColumnDef::new(
2328 "created_at".to_string(),
2329 crate::storage::schema::DataType::UnsignedInteger,
2330 )
2331 .not_null(),
2332 );
2333 table.columns.push(
2334 crate::storage::schema::ColumnDef::new(
2335 "updated_at".to_string(),
2336 crate::storage::schema::DataType::UnsignedInteger,
2337 )
2338 .not_null(),
2339 );
2340 table.constraints.push(
2341 crate::storage::schema::Constraint::new(
2342 "not_null_created_at".to_string(),
2343 crate::storage::schema::ConstraintType::NotNull,
2344 )
2345 .on_columns(vec!["created_at".to_string()]),
2346 );
2347 table.constraints.push(
2348 crate::storage::schema::Constraint::new(
2349 "not_null_updated_at".to_string(),
2350 crate::storage::schema::ConstraintType::NotNull,
2351 )
2352 .on_columns(vec!["updated_at".to_string()]),
2353 );
2354 }
2355 table
2356 .validate()
2357 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2358 Ok(table)
2359}
2360
2361fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2362 let data_type = resolve_declared_data_type(&column.data_type)
2363 .map_err(|err| RedDBError::Query(err.to_string()))?;
2364 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2365 if column.not_null {
2366 column_def = column_def.not_null();
2367 }
2368 if let Some(default) = &column.default {
2369 column_def = column_def.with_default(default.as_bytes().to_vec());
2370 }
2371 if column.compress.unwrap_or(0) > 0 {
2372 column_def = column_def.compressed();
2373 }
2374 if !column.enum_variants.is_empty() {
2375 column_def = column_def.with_variants(column.enum_variants.clone());
2376 }
2377 if let Some(precision) = column.decimal_precision {
2378 column_def = column_def.with_precision(precision);
2379 }
2380 if let Some(element_type) = &column.array_element {
2381 column_def = column_def.with_element_type(
2382 resolve_declared_data_type(element_type)
2383 .map_err(|err| RedDBError::Query(err.to_string()))?,
2384 );
2385 }
2386 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2387 if column.unique {
2388 column_def = column_def.with_metadata("unique", "true");
2389 }
2390 if column.primary_key {
2391 column_def = column_def.with_metadata("primary_key", "true");
2392 }
2393 Ok(column_def)
2394}
2395
2396fn current_unix_ms() -> u128 {
2397 std::time::SystemTime::now()
2398 .duration_since(std::time::UNIX_EPOCH)
2399 .unwrap_or_default()
2400 .as_millis()
2401}
2402
2403#[cfg(test)]
2404mod tests {
2405 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2406 use crate::auth::store::{AuthStore, PrincipalRef};
2407 use crate::auth::UserId;
2408 use crate::auth::{AuthConfig, Role};
2409 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2410 use crate::storage::schema::Value;
2411 use crate::{RedDBOptions, RedDBRuntime};
2412 use std::sync::Arc;
2413
2414 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2415 Policy {
2416 id: id.to_string(),
2417 version: 1,
2418 tenant: None,
2419 created_at: 0,
2420 updated_at: 0,
2421 statements: vec![Statement {
2422 sid: None,
2423 effect: Effect::Allow,
2424 actions: vec![ActionPattern::Exact(action.to_string())],
2425 resources: vec![ResourcePattern::Exact {
2426 kind: "collection".to_string(),
2427 name: collection.to_string(),
2428 }],
2429 condition: None,
2430 }],
2431 }
2432 }
2433
2434 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2435 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2436 *rt.inner.auth_store.write() = Some(store.clone());
2437 store
2438 }
2439
2440 #[test]
2441 fn drop_denied_without_iam_policy() {
2442 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2443 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2444 let store = wire_auth_store(&rt);
2445 let select_only = Policy {
2447 id: "select-only".to_string(),
2448 version: 1,
2449 tenant: None,
2450 created_at: 0,
2451 updated_at: 0,
2452 statements: vec![Statement {
2453 sid: None,
2454 effect: Effect::Allow,
2455 actions: vec![ActionPattern::Exact("select".to_string())],
2456 resources: vec![ResourcePattern::Wildcard],
2457 condition: None,
2458 }],
2459 };
2460 store.put_policy_internal(select_only).unwrap();
2461 let alice = UserId::from_parts(None, "alice");
2462 store
2463 .attach_policy(PrincipalRef::User(alice), "select-only")
2464 .unwrap();
2465 set_current_auth_identity("alice".to_string(), Role::Write);
2466 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2467 clear_current_auth_identity();
2468 assert!(
2469 format!("{err}").contains("denied by IAM policy"),
2470 "got: {err}"
2471 );
2472 }
2473
2474 #[test]
2475 fn drop_allowed_with_explicit_iam_policy() {
2476 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2477 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2478 let store = wire_auth_store(&rt);
2479 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2480 store.put_policy_internal(policy).unwrap();
2481 let bob = UserId::from_parts(None, "bob");
2482 store
2483 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2484 .unwrap();
2485 set_current_auth_identity("bob".to_string(), Role::Write);
2486 rt.execute_query("DROP TABLE bar").unwrap();
2487 clear_current_auth_identity();
2488 }
2489
2490 #[test]
2491 fn drop_allowed_with_wildcard_iam_policy() {
2492 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2493 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2494 let store = wire_auth_store(&rt);
2495 let policy = Policy {
2496 id: "allow-drop-all".to_string(),
2497 version: 1,
2498 tenant: None,
2499 created_at: 0,
2500 updated_at: 0,
2501 statements: vec![Statement {
2502 sid: None,
2503 effect: Effect::Allow,
2504 actions: vec![ActionPattern::Exact("drop".to_string())],
2505 resources: vec![ResourcePattern::Wildcard],
2506 condition: None,
2507 }],
2508 };
2509 store.put_policy_internal(policy).unwrap();
2510 let carl = UserId::from_parts(None, "carl");
2511 store
2512 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2513 .unwrap();
2514 set_current_auth_identity("carl".to_string(), Role::Write);
2515 rt.execute_query("DROP TABLE baz").unwrap();
2516 clear_current_auth_identity();
2517 }
2518
2519 #[test]
2520 fn truncate_denied_without_iam_policy() {
2521 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2522 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2523 let store = wire_auth_store(&rt);
2524 let select_only = Policy {
2526 id: "select-only-2".to_string(),
2527 version: 1,
2528 tenant: None,
2529 created_at: 0,
2530 updated_at: 0,
2531 statements: vec![Statement {
2532 sid: None,
2533 effect: Effect::Allow,
2534 actions: vec![ActionPattern::Exact("select".to_string())],
2535 resources: vec![ResourcePattern::Wildcard],
2536 condition: None,
2537 }],
2538 };
2539 store.put_policy_internal(select_only).unwrap();
2540 let dana = UserId::from_parts(None, "dana");
2541 store
2542 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2543 .unwrap();
2544 set_current_auth_identity("dana".to_string(), Role::Write);
2545 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2546 clear_current_auth_identity();
2547 assert!(
2548 format!("{err}").contains("denied by IAM policy"),
2549 "got: {err}"
2550 );
2551 }
2552
2553 #[test]
2554 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2555 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2556 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2557 .unwrap();
2558 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2559 .unwrap();
2560 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2561 .unwrap();
2562
2563 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2564 assert_eq!(truncated.statement_type, "truncate");
2565 assert_eq!(truncated.affected_rows, 0);
2566
2567 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2568 assert!(empty.result.records.is_empty());
2569
2570 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2571 .unwrap();
2572 let selected = rt
2573 .execute_query("SELECT name FROM users WHERE id = 3")
2574 .unwrap();
2575 let name = selected.result.records[0].get("name").unwrap();
2576 assert_eq!(name, &Value::text("cy"));
2577 assert!(rt.db().collection_contract("users").is_some());
2578 assert!(rt
2579 .inner
2580 .index_store
2581 .list_indices("users")
2582 .iter()
2583 .any(|index| index.name == "idx_users_id"));
2584 }
2585
2586 #[test]
2587 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2588 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2589 rt.execute_query("CREATE QUEUE tasks").unwrap();
2590 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2591
2592 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2593 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2594
2595 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2596 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2597 assert_eq!(
2598 len.result.records[0].get("len"),
2599 Some(&Value::UnsignedInteger(0))
2600 );
2601 }
2602
2603 #[test]
2604 fn truncate_system_schema_is_read_only() {
2605 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2606 let err = rt
2607 .execute_query("TRUNCATE COLLECTION red.collections")
2608 .unwrap_err();
2609 assert!(format!("{err}").contains("system schema is read-only"));
2610 }
2611
2612 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2615 let result = rt
2616 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2617 .expect("peek queue");
2618 result
2619 .result
2620 .records
2621 .iter()
2622 .map(
2623 |record| match record.get("payload").expect("payload column") {
2624 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2625 other => panic!("expected JSON queue payload, got {other:?}"),
2626 },
2627 )
2628 .collect()
2629 }
2630
2631 #[test]
2634 fn truncate_event_enabled_table_emits_single_truncate_event() {
2635 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2636 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2637 .unwrap();
2638 rt.execute_query(
2639 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2640 )
2641 .unwrap();
2642
2643 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2645
2646 rt.execute_query("TRUNCATE TABLE users").unwrap();
2647
2648 let events = queue_payloads(&rt, "users_events");
2649 assert_eq!(
2651 events.len(),
2652 1,
2653 "expected 1 truncate event, got {}",
2654 events.len()
2655 );
2656 let ev = events[0].as_object().expect("event is object");
2657 assert_eq!(
2658 ev.get("op").and_then(crate::json::Value::as_str),
2659 Some("truncate")
2660 );
2661 assert_eq!(
2662 ev.get("collection").and_then(crate::json::Value::as_str),
2663 Some("users")
2664 );
2665 assert_eq!(
2666 ev.get("entities_count")
2667 .and_then(crate::json::Value::as_u64),
2668 Some(3)
2669 );
2670 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2671 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2672 assert!(ev
2673 .get("event_id")
2674 .and_then(crate::json::Value::as_str)
2675 .is_some_and(|s| !s.is_empty()));
2676 }
2677
2678 #[test]
2680 fn truncate_no_events_collection_emits_nothing() {
2681 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2682 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2683 .unwrap();
2684 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2685 .unwrap();
2686 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2688 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2690 assert!(rows.result.records.is_empty());
2691 }
2692
2693 #[test]
2697 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2698 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2699 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2700 .unwrap();
2701 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2702 .unwrap();
2703
2704 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2706
2707 rt.execute_query("DROP TABLE users").unwrap();
2708
2709 let events = queue_payloads(&rt, "users_events");
2711 assert_eq!(
2712 events.len(),
2713 1,
2714 "expected 1 collection_dropped event, got {}",
2715 events.len()
2716 );
2717 let ev = events[0].as_object().expect("event is object");
2718 assert_eq!(
2719 ev.get("op").and_then(crate::json::Value::as_str),
2720 Some("collection_dropped")
2721 );
2722 assert_eq!(
2723 ev.get("collection").and_then(crate::json::Value::as_str),
2724 Some("users")
2725 );
2726 assert_eq!(
2727 ev.get("final_entities_count")
2728 .and_then(crate::json::Value::as_u64),
2729 Some(2)
2730 );
2731 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2732 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2733 assert!(ev
2734 .get("event_id")
2735 .and_then(crate::json::Value::as_str)
2736 .is_some_and(|s| !s.is_empty()));
2737
2738 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2740 assert!(
2741 format!("{err}").contains("users"),
2742 "expected not-found error"
2743 );
2744 }
2745
2746 #[test]
2749 fn drop_no_events_collection_emits_nothing() {
2750 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2751 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2752 .unwrap();
2753 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2754 .unwrap();
2755 rt.execute_query("DROP TABLE plain").unwrap();
2756 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2758 assert!(format!("{err}").contains("plain"));
2759 }
2760
2761 #[test]
2765 fn ops_filter_insert_only_ignores_update_and_delete() {
2766 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2767 rt.execute_query(
2768 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2769 )
2770 .unwrap();
2771 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2772 .unwrap();
2773 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2774 .unwrap();
2775 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2776
2777 let events = queue_payloads(&rt, "items_events");
2778 assert_eq!(
2780 events.len(),
2781 1,
2782 "expected 1 insert event, got {}",
2783 events.len()
2784 );
2785 assert_eq!(
2786 events[0]
2787 .as_object()
2788 .unwrap()
2789 .get("op")
2790 .and_then(crate::json::Value::as_str),
2791 Some("insert")
2792 );
2793 }
2794
2795 #[test]
2797 fn where_filter_skips_rows_that_do_not_match() {
2798 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2799 rt.execute_query(
2800 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2801 )
2802 .unwrap();
2803
2804 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2806 .unwrap();
2807 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2809 .unwrap();
2810
2811 let events = queue_payloads(&rt, "users_events");
2812 assert_eq!(
2813 events.len(),
2814 1,
2815 "expected 1 event (only active), got {}",
2816 events.len()
2817 );
2818 let ev = events[0].as_object().unwrap();
2819 assert_eq!(
2820 ev.get("op").and_then(crate::json::Value::as_str),
2821 Some("insert")
2822 );
2823 let after = ev.get("after").unwrap().as_object().unwrap();
2824 assert_eq!(
2825 after.get("status").and_then(crate::json::Value::as_str),
2826 Some("active")
2827 );
2828 }
2829
2830 #[test]
2832 fn ops_filter_and_where_filter_combined() {
2833 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2834 rt.execute_query(
2835 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2836 )
2837 .unwrap();
2838
2839 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2841 .unwrap();
2842 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2844 .unwrap();
2845 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2847 .unwrap();
2848 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2850
2851 let events = queue_payloads(&rt, "items_events");
2852 assert_eq!(
2854 events.len(),
2855 1,
2856 "expected 1 event, got {}: {events:?}",
2857 events.len()
2858 );
2859 assert_eq!(
2860 events[0]
2861 .as_object()
2862 .unwrap()
2863 .get("op")
2864 .and_then(crate::json::Value::as_str),
2865 Some("insert")
2866 );
2867 }
2868
2869 #[test]
2871 fn where_filter_on_delete_checks_before_state() {
2872 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2873 rt.execute_query(
2874 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2875 )
2876 .unwrap();
2877
2878 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2879 .unwrap();
2880
2881 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2883 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2885
2886 let events = queue_payloads(&rt, "users_events");
2887 assert_eq!(
2888 events.len(),
2889 1,
2890 "expected 1 delete event, got {}",
2891 events.len()
2892 );
2893 let ev = events[0].as_object().unwrap();
2894 assert_eq!(
2895 ev.get("op").and_then(crate::json::Value::as_str),
2896 Some("delete")
2897 );
2898 }
2899
2900 #[test]
2904 fn alter_add_column_on_event_enabled_table_succeeds() {
2905 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2906 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2907 .unwrap();
2908 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2910 .unwrap();
2911 let contract = rt.db().collection_contract("users").unwrap();
2913 assert!(
2914 contract.declared_columns.iter().any(|c| c.name == "phone"),
2915 "phone column should be in contract"
2916 );
2917 assert!(
2919 contract.subscriptions.iter().any(|s| s.enabled),
2920 "subscription should remain enabled"
2921 );
2922 }
2923
2924 #[test]
2927 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2928 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929 rt.execute_query(
2930 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2931 )
2932 .unwrap();
2933 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2935 .unwrap();
2936 let contract = rt.db().collection_contract("items").unwrap();
2937 assert!(
2938 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2939 "secret column should be removed"
2940 );
2941 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2943 .unwrap();
2944 assert!(
2946 contract.subscriptions.iter().any(|s| s.enabled),
2947 "subscription should remain enabled"
2948 );
2949 }
2950}