1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(&query.name, query.collection_model)
265 };
266 self.inner
267 .db
268 .save_collection_contract(contract)
269 .map_err(|err| RedDBError::Internal(err.to_string()))?;
270 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271 store.set_config_tree(
272 &format!("red.collection_tenants.{}", query.name),
273 &crate::serde_json::Value::String(tenant_id),
274 );
275 }
276 self.inner
277 .db
278 .persist_metadata()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281
282 Ok(RuntimeQueryResult::ok_message(
283 raw_query.to_string(),
284 &format!("{label} '{}' created", query.name),
285 "create",
286 ))
287 }
288
289 pub fn execute_create_collection(
290 &self,
291 raw_query: &str,
292 query: &CreateCollectionQuery,
293 ) -> RedDBResult<RuntimeQueryResult> {
294 let model = match query.kind.as_str() {
295 "graph" => CollectionModel::Graph,
296 "document" => CollectionModel::Document,
297 "metrics" => CollectionModel::Metrics,
298 "vector.turbo" => {
299 let dimension = query.vector_dimension.ok_or_else(|| {
300 RedDBError::Query(
301 "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
302 )
303 })?;
304 let create = CreateVectorQuery {
305 name: query.name.clone(),
306 dimension,
307 metric: query
308 .vector_metric
309 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
310 if_not_exists: query.if_not_exists,
311 };
312 let result = self.execute_create_vector(raw_query, &create)?;
313 let store = self.inner.db.store();
320 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
321 self.inner
322 .db
323 .persist_metadata()
324 .map_err(|err| RedDBError::Internal(err.to_string()))?;
325 let _ = self.inner.db.turbo_state(&query.name);
330 return Ok(result);
331 }
332 "blockchain" => CollectionModel::Table,
337 other => {
338 return Err(RedDBError::Query(format!(
339 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
340 )));
341 }
342 };
343 let create = CreateTableQuery {
344 collection_model: model,
345 name: query.name.clone(),
346 columns: Vec::new(),
347 if_not_exists: query.if_not_exists,
348 default_ttl_ms: None,
349 metrics_rollup_policies: Vec::new(),
350 context_index_fields: Vec::new(),
351 context_index_enabled: false,
352 timestamps: false,
353 partition_by: None,
354 tenant_by: None,
355 append_only: false,
356 subscriptions: Vec::new(),
357 vault_own_master_key: false,
358 };
359 let result = self.execute_create_table(raw_query, &create)?;
360 if query.kind == "blockchain" {
361 self.install_blockchain_kind(&query.name)?;
362 }
363 if !query.allowed_signers.is_empty() {
368 let actor = crate::runtime::impl_core::current_user_projected()
369 .unwrap_or_else(|| "@system/create-collection".to_string());
370 crate::runtime::signed_writes_kind::install(
371 &self.inner.db.store(),
372 &query.name,
373 &query.allowed_signers,
374 &actor,
375 );
376 }
377 Ok(result)
378 }
379
380 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
384 use crate::runtime::blockchain_kind;
385 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
386 use std::sync::Arc;
387
388 let store = self.inner.db.store();
389 blockchain_kind::mark_as_chain(&store, name);
390
391 let existing_tip = blockchain_kind::chain_tip(&store, name);
392 if existing_tip.height.is_some() {
393 return Ok(());
394 }
395
396 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
397 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
398 fields.into_iter().collect();
399 let entity = UnifiedEntity::new(
400 EntityId::new(0),
401 EntityKind::TableRow {
402 table: Arc::from(name),
403 row_id: 0,
404 },
405 EntityData::Row(RowData {
406 columns: Vec::new(),
407 named: Some(named),
408 schema: None,
409 }),
410 );
411 store
412 .insert_auto(name, entity)
413 .map_err(|err| RedDBError::Internal(err.to_string()))?;
414 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
417 self.inner
418 .chain_tip_cache
419 .lock()
420 .insert(name.to_string(), tip);
421 }
422 Ok(())
423 }
424
425 pub fn execute_create_vector(
426 &self,
427 raw_query: &str,
428 query: &CreateVectorQuery,
429 ) -> RedDBResult<RuntimeQueryResult> {
430 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
431 if is_system_schema_name(&query.name) {
432 return Err(RedDBError::Query("system schema is read-only".to_string()));
433 }
434 let store = self.inner.db.store();
435 if store.get_collection(&query.name).is_some() {
436 if query.if_not_exists {
437 return Ok(RuntimeQueryResult::ok_message(
438 raw_query.to_string(),
439 &format!("vector '{}' already exists", query.name),
440 "create",
441 ));
442 }
443 return Err(RedDBError::Query(format!(
444 "vector '{}' already exists",
445 query.name
446 )));
447 }
448
449 store
450 .create_collection(&query.name)
451 .map_err(|err| RedDBError::Internal(err.to_string()))?;
452 self.inner
453 .db
454 .save_collection_contract(vector_collection_contract(query))
455 .map_err(|err| RedDBError::Internal(err.to_string()))?;
456 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
457 store.set_config_tree(
458 &format!("red.collection_tenants.{}", query.name),
459 &crate::serde_json::Value::String(tenant_id),
460 );
461 }
462 self.inner
463 .db
464 .persist_metadata()
465 .map_err(|err| RedDBError::Internal(err.to_string()))?;
466 self.invalidate_result_cache();
467
468 Ok(RuntimeQueryResult::ok_message(
469 raw_query.to_string(),
470 &format!("vector '{}' created", query.name),
471 "create",
472 ))
473 }
474
475 fn provision_vault_key_material(
476 &self,
477 collection: &str,
478 own_master_key: bool,
479 ) -> RedDBResult<()> {
480 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
481 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
482 })?;
483 if !auth_store.is_vault_backed() {
484 return Err(RedDBError::Query(
485 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
486 ));
487 }
488
489 if auth_store.vault_secret_key().is_none() {
490 let key = crate::auth::store::random_bytes(32);
491 auth_store
492 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
493 .map_err(|err| RedDBError::Query(err.to_string()))?;
494 }
495
496 if own_master_key {
497 let key = crate::auth::store::random_bytes(32);
498 auth_store
499 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
500 .map_err(|err| RedDBError::Query(err.to_string()))?;
501 }
502
503 Ok(())
504 }
505
506 pub fn execute_drop_table(
510 &self,
511 raw_query: &str,
512 query: &DropTableQuery,
513 ) -> RedDBResult<RuntimeQueryResult> {
514 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
515 let store = self.inner.db.store();
516
517 if is_system_schema_name(&query.name) {
518 return Err(RedDBError::Query("system schema is read-only".to_string()));
519 }
520
521 let exists = store.get_collection(&query.name).is_some();
522 if !exists {
523 if query.if_exists {
524 return Ok(RuntimeQueryResult::ok_message(
525 raw_query.to_string(),
526 &format!("table '{}' does not exist", query.name),
527 "drop",
528 ));
529 }
530 return Err(RedDBError::NotFound(format!(
531 "table '{}' not found",
532 query.name
533 )));
534 }
535 let actual =
536 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
537 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
538
539 let final_count = store
542 .get_collection(&query.name)
543 .map(|manager| manager.query_all(|_| true).len() as u64)
544 .unwrap_or(0);
545 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
546 self,
547 &query.name,
548 final_count,
549 )?;
550
551 let orphaned_indices: Vec<String> = self
552 .inner
553 .index_store
554 .list_indices(&query.name)
555 .into_iter()
556 .map(|index| index.name)
557 .collect();
558 for name in &orphaned_indices {
559 self.inner.index_store.drop_index(name, &query.name);
560 }
561
562 store
563 .drop_collection(&query.name)
564 .map_err(|err| RedDBError::Internal(err.to_string()))?;
565 self.inner.db.invalidate_vector_index(&query.name);
566 self.inner.db.clear_collection_default_ttl_ms(&query.name);
567 self.inner
568 .db
569 .remove_collection_contract(&query.name)
570 .map_err(|err| RedDBError::Internal(err.to_string()))?;
571 self.clear_table_planner_stats(&query.name);
572 self.invalidate_result_cache();
573 if let Some(store) = self.inner.auth_store.read().clone() {
577 store.invalidate_visible_collections_cache();
578 }
579 self.inner
580 .db
581 .persist_metadata()
582 .map_err(|err| RedDBError::Internal(err.to_string()))?;
583 self.schema_vocabulary_apply(
589 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
590 collection: query.name.clone(),
591 },
592 );
593
594 Ok(RuntimeQueryResult::ok_message(
595 raw_query.to_string(),
596 &format!("table '{}' dropped", query.name),
597 "drop",
598 ))
599 }
600
601 pub fn execute_drop_graph(
602 &self,
603 raw_query: &str,
604 query: &DropGraphQuery,
605 ) -> RedDBResult<RuntimeQueryResult> {
606 self.execute_drop_typed_collection(
607 raw_query,
608 &query.name,
609 query.if_exists,
610 CollectionModel::Graph,
611 "graph",
612 )
613 }
614
615 pub fn execute_drop_vector(
616 &self,
617 raw_query: &str,
618 query: &DropVectorQuery,
619 ) -> RedDBResult<RuntimeQueryResult> {
620 self.execute_drop_typed_collection(
621 raw_query,
622 &query.name,
623 query.if_exists,
624 CollectionModel::Vector,
625 "vector",
626 )
627 }
628
629 pub fn execute_drop_document(
630 &self,
631 raw_query: &str,
632 query: &DropDocumentQuery,
633 ) -> RedDBResult<RuntimeQueryResult> {
634 self.execute_drop_typed_collection(
635 raw_query,
636 &query.name,
637 query.if_exists,
638 CollectionModel::Document,
639 "document",
640 )
641 }
642
643 pub fn execute_drop_kv(
644 &self,
645 raw_query: &str,
646 query: &DropKvQuery,
647 ) -> RedDBResult<RuntimeQueryResult> {
648 let label = polymorphic_resolver::model_name(query.model);
649 self.execute_drop_typed_collection(
650 raw_query,
651 &query.name,
652 query.if_exists,
653 query.model,
654 label,
655 )
656 }
657
658 pub fn execute_drop_collection(
659 &self,
660 raw_query: &str,
661 query: &DropCollectionQuery,
662 ) -> RedDBResult<RuntimeQueryResult> {
663 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
664 if is_system_schema_name(&query.name) {
665 return Err(RedDBError::Query("system schema is read-only".to_string()));
666 }
667 let store = self.inner.db.store();
668 if store.get_collection(&query.name).is_none() {
669 if query.if_exists {
670 return Ok(RuntimeQueryResult::ok_message(
671 raw_query.to_string(),
672 &format!("collection '{}' does not exist", query.name),
673 "drop",
674 ));
675 }
676 return Err(RedDBError::NotFound(format!(
677 "collection '{}' not found",
678 query.name
679 )));
680 }
681
682 let actual =
683 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
684 if let Some(expected) = query.model {
685 polymorphic_resolver::ensure_model_match(expected, actual)?;
686 }
687
688 match actual {
689 CollectionModel::Table => self.execute_drop_table(
690 raw_query,
691 &DropTableQuery {
692 name: query.name.clone(),
693 if_exists: query.if_exists,
694 },
695 ),
696 CollectionModel::TimeSeries => self.execute_drop_timeseries(
697 raw_query,
698 &DropTimeSeriesQuery {
699 name: query.name.clone(),
700 if_exists: query.if_exists,
701 },
702 ),
703 CollectionModel::Queue => self.execute_drop_queue(
704 raw_query,
705 &DropQueueQuery {
706 name: query.name.clone(),
707 if_exists: query.if_exists,
708 },
709 ),
710 CollectionModel::Graph => self.execute_drop_graph(
711 raw_query,
712 &DropGraphQuery {
713 name: query.name.clone(),
714 if_exists: query.if_exists,
715 },
716 ),
717 CollectionModel::Vector => self.execute_drop_vector(
718 raw_query,
719 &DropVectorQuery {
720 name: query.name.clone(),
721 if_exists: query.if_exists,
722 },
723 ),
724 CollectionModel::Document => self.execute_drop_document(
725 raw_query,
726 &DropDocumentQuery {
727 name: query.name.clone(),
728 if_exists: query.if_exists,
729 },
730 ),
731 CollectionModel::Kv => self.execute_drop_kv(
732 raw_query,
733 &DropKvQuery {
734 name: query.name.clone(),
735 if_exists: query.if_exists,
736 model: CollectionModel::Kv,
737 },
738 ),
739 CollectionModel::Config => self.execute_drop_kv(
740 raw_query,
741 &DropKvQuery {
742 name: query.name.clone(),
743 if_exists: query.if_exists,
744 model: CollectionModel::Config,
745 },
746 ),
747 CollectionModel::Vault => self.execute_drop_kv(
748 raw_query,
749 &DropKvQuery {
750 name: query.name.clone(),
751 if_exists: query.if_exists,
752 model: CollectionModel::Vault,
753 },
754 ),
755 CollectionModel::Hll => self.execute_probabilistic_command(
756 raw_query,
757 &ProbabilisticCommand::DropHll {
758 name: query.name.clone(),
759 if_exists: query.if_exists,
760 },
761 ),
762 CollectionModel::Sketch => self.execute_probabilistic_command(
763 raw_query,
764 &ProbabilisticCommand::DropSketch {
765 name: query.name.clone(),
766 if_exists: query.if_exists,
767 },
768 ),
769 CollectionModel::Filter => self.execute_probabilistic_command(
770 raw_query,
771 &ProbabilisticCommand::DropFilter {
772 name: query.name.clone(),
773 if_exists: query.if_exists,
774 },
775 ),
776 CollectionModel::Metrics => self.execute_drop_typed_collection(
777 raw_query,
778 &query.name,
779 query.if_exists,
780 CollectionModel::Metrics,
781 "metrics",
782 ),
783 CollectionModel::Mixed => self.execute_drop_typed_collection(
784 raw_query,
785 &query.name,
786 query.if_exists,
787 CollectionModel::Mixed,
788 "collection",
789 ),
790 }
791 }
792
793 pub fn execute_alter_table(
799 &self,
800 raw_query: &str,
801 query: &AlterTableQuery,
802 ) -> RedDBResult<RuntimeQueryResult> {
803 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
804 let store = self.inner.db.store();
805
806 if store.get_collection(&query.name).is_none() {
808 return Err(RedDBError::NotFound(format!(
809 "table '{}' not found",
810 query.name
811 )));
812 }
813
814 let mut messages = Vec::new();
815
816 let fields_added: Vec<String> = query
818 .operations
819 .iter()
820 .filter_map(|op| {
821 if let AlterOperation::AddColumn(col) = op {
822 Some(col.name.clone())
823 } else {
824 None
825 }
826 })
827 .collect();
828 let fields_removed: Vec<String> = query
829 .operations
830 .iter()
831 .filter_map(|op| {
832 if let AlterOperation::DropColumn(name) = op {
833 Some(name.clone())
834 } else {
835 None
836 }
837 })
838 .collect();
839
840 for op in &query.operations {
841 match op {
842 AlterOperation::AddColumn(col) => {
843 messages.push(format!("column '{}' added", col.name));
845 }
846 AlterOperation::DropColumn(name) => {
847 messages.push(format!("column '{}' dropped", name));
848 }
849 AlterOperation::RenameColumn { from, to } => {
850 messages.push(format!("column '{}' renamed to '{}'", from, to));
851 }
852 AlterOperation::AttachPartition { child, bound } => {
853 store.set_config_tree(
857 &format!("partition.{}.children.{}", query.name, child),
858 &crate::serde_json::Value::String(bound.clone()),
859 );
860 messages.push(format!(
861 "partition '{child}' attached to '{}' ({bound})",
862 query.name
863 ));
864 }
865 AlterOperation::DetachPartition { child } => {
866 store.set_config_tree(
867 &format!("partition.{}.children.{}", query.name, child),
868 &crate::serde_json::Value::Null,
869 );
870 messages.push(format!(
871 "partition '{child}' detached from '{}'",
872 query.name
873 ));
874 }
875 AlterOperation::EnableRowLevelSecurity => {
876 self.inner
877 .rls_enabled_tables
878 .write()
879 .insert(query.name.clone());
880 store.set_config_tree(
882 &format!("rls.enabled.{}", query.name),
883 &crate::serde_json::Value::Bool(true),
884 );
885 self.invalidate_plan_cache();
886 messages.push(format!("row level security enabled on '{}'", query.name));
887 }
888 AlterOperation::DisableRowLevelSecurity => {
889 self.inner.rls_enabled_tables.write().remove(&query.name);
890 store.set_config_tree(
891 &format!("rls.enabled.{}", query.name),
892 &crate::serde_json::Value::Null,
893 );
894 self.invalidate_plan_cache();
895 messages.push(format!("row level security disabled on '{}'", query.name));
896 }
897 AlterOperation::EnableTenancy { column } => {
899 store.set_config_tree(
900 &format!("tenant_tables.{}.column", query.name),
901 &crate::serde_json::Value::String(column.clone()),
902 );
903 self.register_tenant_table(&query.name, column);
904 self.invalidate_plan_cache();
905 messages.push(format!(
906 "tenancy enabled on '{}' by column '{column}'",
907 query.name
908 ));
909 }
910 AlterOperation::DisableTenancy => {
911 store.set_config_tree(
912 &format!("tenant_tables.{}.column", query.name),
913 &crate::serde_json::Value::Null,
914 );
915 self.unregister_tenant_table(&query.name);
916 self.invalidate_plan_cache();
917 messages.push(format!("tenancy disabled on '{}'", query.name));
918 }
919 AlterOperation::SetAppendOnly(on) => {
920 messages.push(format!(
925 "append_only {} on '{}'",
926 if *on { "enabled" } else { "disabled" },
927 query.name
928 ));
929 }
930 AlterOperation::SetVersioned(on) => {
931 self.vcs_set_versioned(&query.name, *on)?;
938 messages.push(format!(
939 "versioned {} on '{}'",
940 if *on { "enabled" } else { "disabled" },
941 query.name
942 ));
943 }
944 AlterOperation::EnableEvents(subscription) => {
945 let mut subscription = subscription.clone();
946 subscription.source = query.name.clone();
947 validate_event_subscriptions(
948 self,
949 &query.name,
950 std::slice::from_ref(&subscription),
951 )?;
952 ensure_event_target_queue(self, &subscription.target_queue)?;
953 messages.push(format!(
954 "events enabled on '{}' to '{}'",
955 query.name, subscription.target_queue
956 ));
957 }
958 AlterOperation::DisableEvents => {
959 messages.push(format!("events disabled on '{}'", query.name));
960 }
961 AlterOperation::AddSubscription { name, descriptor } => {
962 let mut sub = descriptor.clone();
963 sub.name = name.clone();
964 sub.source = query.name.clone();
965 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
966 ensure_event_target_queue(self, &sub.target_queue)?;
967 messages.push(format!(
968 "subscription '{}' added on '{}' to '{}'",
969 name, query.name, sub.target_queue
970 ));
971 }
972 AlterOperation::DropSubscription { name } => {
973 messages.push(format!(
974 "subscription '{}' dropped on '{}'",
975 name, query.name
976 ));
977 }
978 AlterOperation::AddSigner { pubkey } => {
979 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
986 return Err(RedDBError::Query(format!(
987 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
988 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
989 query.name
990 )));
991 }
992 let actor = crate::runtime::impl_core::current_user_projected()
993 .unwrap_or_else(|| "@system/alter".to_string());
994 let changed = crate::runtime::signed_writes_kind::add_signer(
995 &store,
996 &query.name,
997 *pubkey,
998 &actor,
999 );
1000 messages.push(format!(
1001 "signer {} on '{}'",
1002 if changed { "added" } else { "already present" },
1003 query.name
1004 ));
1005 }
1006 AlterOperation::RevokeSigner { pubkey } => {
1007 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1008 return Err(RedDBError::Query(format!(
1009 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1010 query.name
1011 )));
1012 }
1013 let actor = crate::runtime::impl_core::current_user_projected()
1014 .unwrap_or_else(|| "@system/alter".to_string());
1015 let changed = crate::runtime::signed_writes_kind::revoke_signer(
1016 &store,
1017 &query.name,
1018 pubkey,
1019 &actor,
1020 );
1021 messages.push(format!(
1022 "signer {} on '{}'",
1023 if changed {
1024 "revoked"
1025 } else {
1026 "already revoked"
1027 },
1028 query.name
1029 ));
1030 }
1031 AlterOperation::SetRetention { duration_ms } => {
1032 let existing = self.inner.db.collection_contract(&query.name);
1038 let has_ts_column = existing
1039 .as_ref()
1040 .map(retention_timestamp_column_exists)
1041 .unwrap_or(false);
1042 if !has_ts_column {
1043 return Err(RedDBError::Query(format!(
1044 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1045 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1046 or enable WITH timestamps = true before setting a \
1047 retention policy",
1048 query.name
1049 )));
1050 }
1051 messages.push(format!(
1052 "retention set to {duration_ms} ms on '{}'",
1053 query.name
1054 ));
1055 }
1056 AlterOperation::UnsetRetention => {
1057 messages.push(format!("retention cleared on '{}'", query.name));
1058 }
1059 }
1060 }
1061
1062 let mut contract = self
1063 .inner
1064 .db
1065 .collection_contract(&query.name)
1066 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1067 apply_alter_operations_to_contract(&mut contract, &query.operations);
1068 contract.version = contract.version.saturating_add(1);
1069 contract.updated_at_unix_ms = current_unix_ms();
1070 self.inner
1071 .db
1072 .save_collection_contract(contract)
1073 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1074 if !fields_added.is_empty() || !fields_removed.is_empty() {
1078 let sub_names: Vec<String> = self
1079 .inner
1080 .db
1081 .collection_contract(&query.name)
1082 .map(|c| {
1083 c.subscriptions
1084 .iter()
1085 .filter(|s| s.enabled)
1086 .map(|s| s.name.clone())
1087 .collect()
1088 })
1089 .unwrap_or_default();
1090 if !sub_names.is_empty() {
1091 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1092 collection: query.name.clone(),
1093 subscription_names: sub_names.join(", "),
1094 fields_added: fields_added.join(", "),
1095 fields_removed: fields_removed.join(", "),
1096 lsn: self.cdc_current_lsn(),
1097 }
1098 .emit_global();
1099 }
1100 }
1101
1102 self.clear_table_planner_stats(&query.name);
1103 self.invalidate_result_cache();
1104 let post_alter_columns: Vec<String> = self
1109 .inner
1110 .db
1111 .collection_contract(&query.name)
1112 .map(|contract| {
1113 contract
1114 .declared_columns
1115 .iter()
1116 .map(|col| col.name.clone())
1117 .collect()
1118 })
1119 .unwrap_or_default();
1120 self.schema_vocabulary_apply(
1121 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1122 collection: query.name.clone(),
1123 columns: post_alter_columns,
1124 type_tags: Vec::new(),
1125 description: None,
1126 },
1127 );
1128
1129 let message = if messages.is_empty() {
1130 format!("table '{}' altered (no operations)", query.name)
1131 } else {
1132 format!("table '{}' altered: {}", query.name, messages.join(", "))
1133 };
1134
1135 Ok(RuntimeQueryResult::ok_message(
1136 raw_query.to_string(),
1137 &message,
1138 "alter",
1139 ))
1140 }
1141
1142 pub fn execute_explain_alter(
1149 &self,
1150 raw_query: &str,
1151 query: &ExplainAlterQuery,
1152 ) -> RedDBResult<RuntimeQueryResult> {
1153 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1157
1158 let current_contract = self.inner.db.collection_contract(&query.target.name);
1159
1160 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1161 .as_ref()
1162 .map(|c| c.declared_columns.clone())
1163 .unwrap_or_default();
1164
1165 let diff = super::schema_diff::compute_column_diff(
1166 &query.target.name,
1167 ¤t_columns,
1168 &query.target.columns,
1169 );
1170
1171 let rendered = match query.format {
1172 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1173 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1174 };
1175
1176 let format_label = match query.format {
1177 ExplainFormat::Sql => "sql",
1178 ExplainFormat::Json => "json",
1179 };
1180
1181 let columns = vec![
1182 "table".to_string(),
1183 "format".to_string(),
1184 "diff".to_string(),
1185 ];
1186 let row = vec![
1187 ("table".to_string(), Value::text(query.target.name.clone())),
1188 ("format".to_string(), Value::text(format_label.to_string())),
1189 ("diff".to_string(), Value::text(rendered)),
1190 ];
1191
1192 Ok(RuntimeQueryResult::ok_records(
1193 raw_query.to_string(),
1194 columns,
1195 vec![row],
1196 "explain",
1197 ))
1198 }
1199
1200 pub fn execute_create_index(
1205 &self,
1206 raw_query: &str,
1207 query: &CreateIndexQuery,
1208 ) -> RedDBResult<RuntimeQueryResult> {
1209 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1210 let store = self.inner.db.store();
1211
1212 let manager = store
1214 .get_collection(&query.table)
1215 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1216
1217 let method_kind = match query.method {
1218 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1219 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1220 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1221 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1222 };
1223
1224 let entities = manager.query_all(|_| true);
1234 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1235 entities
1236 .iter()
1237 .map(|e| {
1238 let fields = match &e.data {
1239 crate::storage::EntityData::Row(row) => {
1240 if let Some(ref named) = row.named {
1241 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1242 } else if let Some(ref schema) = row.schema {
1243 schema
1247 .iter()
1248 .zip(row.columns.iter())
1249 .map(|(k, v)| (k.clone(), v.clone()))
1250 .collect()
1251 } else {
1252 Vec::new()
1253 }
1254 }
1255 crate::storage::EntityData::Node(node) => node
1256 .properties
1257 .iter()
1258 .map(|(k, v)| (k.clone(), v.clone()))
1259 .collect(),
1260 _ => Vec::new(),
1261 };
1262 (e.id, fields)
1263 })
1264 .collect();
1265
1266 let indexed_count = self
1268 .inner
1269 .index_store
1270 .create_index(
1271 &query.name,
1272 &query.table,
1273 &query.columns,
1274 method_kind,
1275 query.unique,
1276 &entity_fields,
1277 )
1278 .map_err(RedDBError::Internal)?;
1279
1280 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1281 &query.table,
1282 &entity_fields,
1283 );
1284 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1285 self.invalidate_plan_cache();
1286
1287 self.inner
1289 .index_store
1290 .register(super::index_store::RegisteredIndex {
1291 name: query.name.clone(),
1292 collection: query.table.clone(),
1293 columns: query.columns.clone(),
1294 method: method_kind,
1295 unique: query.unique,
1296 });
1297 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1301 collection: query.table.clone(),
1302 index: query.name.clone(),
1303 columns: query.columns.clone(),
1304 });
1305
1306 let method_str = format!("{}", query.method);
1307 let unique_str = if query.unique { "unique " } else { "" };
1308 let cols = query.columns.join(", ");
1309
1310 Ok(RuntimeQueryResult::ok_message(
1311 raw_query.to_string(),
1312 &format!(
1313 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1314 unique_str, query.name, query.table, cols, method_str, indexed_count
1315 ),
1316 "create",
1317 ))
1318 }
1319
1320 pub fn execute_drop_index(
1324 &self,
1325 raw_query: &str,
1326 query: &DropIndexQuery,
1327 ) -> RedDBResult<RuntimeQueryResult> {
1328 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1329 let store = self.inner.db.store();
1330
1331 if store.get_collection(&query.table).is_none() {
1333 if query.if_exists {
1334 return Ok(RuntimeQueryResult::ok_message(
1335 raw_query.to_string(),
1336 &format!("table '{}' does not exist", query.table),
1337 "drop",
1338 ));
1339 }
1340 return Err(RedDBError::NotFound(format!(
1341 "table '{}' not found",
1342 query.table
1343 )));
1344 }
1345
1346 self.inner.index_store.drop_index(&query.name, &query.table);
1348 self.invalidate_plan_cache();
1349 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1351 collection: query.table.clone(),
1352 index: query.name.clone(),
1353 });
1354
1355 Ok(RuntimeQueryResult::ok_message(
1356 raw_query.to_string(),
1357 &format!("index '{}' dropped from '{}'", query.name, query.table),
1358 "drop",
1359 ))
1360 }
1361
1362 fn execute_drop_typed_collection(
1363 &self,
1364 raw_query: &str,
1365 name: &str,
1366 if_exists: bool,
1367 expected_model: CollectionModel,
1368 label: &str,
1369 ) -> RedDBResult<RuntimeQueryResult> {
1370 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1371 if is_system_schema_name(name) {
1372 return Err(RedDBError::Query("system schema is read-only".to_string()));
1373 }
1374 let store = self.inner.db.store();
1375 if store.get_collection(name).is_none() {
1376 if if_exists {
1377 return Ok(RuntimeQueryResult::ok_message(
1378 raw_query.to_string(),
1379 &format!("{label} '{name}' does not exist"),
1380 "drop",
1381 ));
1382 }
1383 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1384 }
1385
1386 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1387 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1388 self.drop_collection_storage(raw_query, name, label)
1389 }
1390
1391 pub fn execute_truncate(
1392 &self,
1393 raw_query: &str,
1394 query: &TruncateQuery,
1395 ) -> RedDBResult<RuntimeQueryResult> {
1396 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1397 if is_system_schema_name(&query.name) {
1398 return Err(RedDBError::Query("system schema is read-only".to_string()));
1399 }
1400
1401 let label = query
1402 .model
1403 .map(polymorphic_resolver::model_name)
1404 .unwrap_or("collection");
1405 let store = self.inner.db.store();
1406 if store.get_collection(&query.name).is_none() {
1407 if query.if_exists {
1408 return Ok(RuntimeQueryResult::ok_message(
1409 raw_query.to_string(),
1410 &format!("{label} '{}' does not exist", query.name),
1411 "truncate",
1412 ));
1413 }
1414 return Err(RedDBError::NotFound(format!(
1415 "{label} '{}' not found",
1416 query.name
1417 )));
1418 }
1419
1420 let actual =
1421 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1422 if let Some(expected) = query.model {
1423 polymorphic_resolver::ensure_model_match(expected, actual)?;
1424 }
1425
1426 if actual == CollectionModel::Queue {
1427 return self.execute_queue_command(
1428 raw_query,
1429 &QueueCommand::Purge {
1430 queue: query.name.clone(),
1431 },
1432 );
1433 }
1434
1435 let affected = self.truncate_collection_entities(&query.name)?;
1437 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1439 self.inner.db.invalidate_vector_index(&query.name);
1440 self.clear_table_planner_stats(&query.name);
1441 self.invalidate_result_cache();
1442
1443 Ok(RuntimeQueryResult::ok_message(
1444 raw_query.to_string(),
1445 &format!(
1446 "{affected} entities truncated from {label} '{}'",
1447 query.name
1448 ),
1449 "truncate",
1450 ))
1451 }
1452
1453 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1454 let store = self.inner.db.store();
1455 let Some(manager) = store.get_collection(name) else {
1456 return Ok(0);
1457 };
1458 let entities = manager.query_all(|_| true);
1459 if entities.is_empty() {
1460 return Ok(0);
1461 }
1462
1463 for entity in &entities {
1464 let fields = entity_index_fields(&entity.data);
1465 self.inner
1466 .index_store
1467 .index_entity_delete(name, entity.id, &fields)
1468 .map_err(RedDBError::Internal)?;
1469 }
1470
1471 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1472 let deleted_ids = store
1473 .delete_batch(name, &ids)
1474 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1475 for id in &deleted_ids {
1476 store.context_index().remove_entity(*id);
1477 }
1478 Ok(deleted_ids.len() as u64)
1479 }
1480
1481 fn drop_collection_storage(
1482 &self,
1483 raw_query: &str,
1484 name: &str,
1485 label: &str,
1486 ) -> RedDBResult<RuntimeQueryResult> {
1487 let store = self.inner.db.store();
1488
1489 let final_count = store
1492 .get_collection(name)
1493 .map(|manager| manager.query_all(|_| true).len() as u64)
1494 .unwrap_or(0);
1495 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1496 self,
1497 name,
1498 final_count,
1499 )?;
1500
1501 let orphaned_indices: Vec<String> = self
1502 .inner
1503 .index_store
1504 .list_indices(name)
1505 .into_iter()
1506 .map(|index| index.name)
1507 .collect();
1508 for index_name in &orphaned_indices {
1509 self.inner.index_store.drop_index(index_name, name);
1510 }
1511
1512 store
1513 .drop_collection(name)
1514 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1515 self.inner.db.invalidate_vector_index(name);
1516 self.inner.db.clear_collection_default_ttl_ms(name);
1517 self.inner
1518 .db
1519 .remove_collection_contract(name)
1520 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1521 self.clear_table_planner_stats(name);
1522 self.invalidate_result_cache();
1523 if let Some(store) = self.inner.auth_store.read().clone() {
1524 store.invalidate_visible_collections_cache();
1525 }
1526 self.inner
1527 .db
1528 .persist_metadata()
1529 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1530 self.schema_vocabulary_apply(
1531 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1532 collection: name.to_string(),
1533 },
1534 );
1535
1536 Ok(RuntimeQueryResult::ok_message(
1537 raw_query.to_string(),
1538 &format!("{label} '{name}' dropped"),
1539 "drop",
1540 ))
1541 }
1542}
1543
1544pub(crate) fn is_system_schema_name(name: &str) -> bool {
1545 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1546}
1547
1548fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1549 match data {
1550 EntityData::Row(row) => {
1551 if let Some(ref named) = row.named {
1552 named
1553 .iter()
1554 .map(|(key, value)| (key.clone(), value.clone()))
1555 .collect()
1556 } else if let Some(ref schema) = row.schema {
1557 schema
1558 .iter()
1559 .zip(row.columns.iter())
1560 .map(|(key, value)| (key.clone(), value.clone()))
1561 .collect()
1562 } else {
1563 Vec::new()
1564 }
1565 }
1566 EntityData::Node(node) => node
1567 .properties
1568 .iter()
1569 .map(|(key, value)| (key.clone(), value.clone()))
1570 .collect(),
1571 _ => Vec::new(),
1572 }
1573}
1574
1575fn collection_contract_from_create_table(
1576 query: &CreateTableQuery,
1577) -> RedDBResult<crate::physical::CollectionContract> {
1578 let now = current_unix_ms();
1579 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1580 .columns
1581 .iter()
1582 .map(declared_column_contract_from_ddl)
1583 .collect();
1584 if query.timestamps {
1585 declared_columns.push(crate::physical::DeclaredColumnContract {
1589 name: "created_at".to_string(),
1590 data_type: "BIGINT".to_string(),
1591 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1592 not_null: true,
1593 default: None,
1594 compress: None,
1595 unique: false,
1596 primary_key: false,
1597 enum_variants: Vec::new(),
1598 array_element: None,
1599 decimal_precision: None,
1600 });
1601 declared_columns.push(crate::physical::DeclaredColumnContract {
1602 name: "updated_at".to_string(),
1603 data_type: "BIGINT".to_string(),
1604 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1605 not_null: true,
1606 default: None,
1607 compress: None,
1608 unique: false,
1609 primary_key: false,
1610 enum_variants: Vec::new(),
1611 array_element: None,
1612 decimal_precision: None,
1613 });
1614 }
1615 Ok(crate::physical::CollectionContract {
1616 name: query.name.clone(),
1617 declared_model: crate::catalog::CollectionModel::Table,
1618 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1619 origin: crate::physical::ContractOrigin::Explicit,
1620 version: 1,
1621 created_at_unix_ms: now,
1622 updated_at_unix_ms: now,
1623 default_ttl_ms: query.default_ttl_ms,
1624 vector_dimension: None,
1625 vector_metric: None,
1626 context_index_fields: query.context_index_fields.clone(),
1627 declared_columns,
1628 table_def: Some(build_table_def_from_create_table(query)?),
1629 timestamps_enabled: query.timestamps,
1630 context_index_enabled: query.context_index_enabled
1631 || !query.context_index_fields.is_empty(),
1632 metrics_raw_retention_ms: None,
1633 metrics_rollup_policies: Vec::new(),
1634 metrics_tenant_identity: None,
1635 metrics_namespace: None,
1636 append_only: query.append_only,
1637 subscriptions: query.subscriptions.clone(),
1638 session_key: None,
1639 session_gap_ms: None,
1640 retention_duration_ms: None,
1641 })
1642}
1643
1644fn default_collection_contract_for_existing_table(
1645 name: &str,
1646) -> crate::physical::CollectionContract {
1647 let now = current_unix_ms();
1648 crate::physical::CollectionContract {
1649 name: name.to_string(),
1650 declared_model: crate::catalog::CollectionModel::Table,
1651 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1652 origin: crate::physical::ContractOrigin::Explicit,
1653 version: 0,
1654 created_at_unix_ms: now,
1655 updated_at_unix_ms: now,
1656 default_ttl_ms: None,
1657 vector_dimension: None,
1658 vector_metric: None,
1659 context_index_fields: Vec::new(),
1660 declared_columns: Vec::new(),
1661 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1662 timestamps_enabled: false,
1663 context_index_enabled: false,
1664 metrics_raw_retention_ms: None,
1665 metrics_rollup_policies: Vec::new(),
1666 metrics_tenant_identity: None,
1667 metrics_namespace: None,
1668 append_only: false,
1669 subscriptions: Vec::new(),
1670 session_key: None,
1671 session_gap_ms: None,
1672 retention_duration_ms: None,
1673 }
1674}
1675
1676fn keyed_collection_contract(
1677 name: &str,
1678 model: crate::catalog::CollectionModel,
1679) -> crate::physical::CollectionContract {
1680 let now = current_unix_ms();
1681 crate::physical::CollectionContract {
1682 name: name.to_string(),
1683 declared_model: model,
1684 schema_mode: crate::catalog::SchemaMode::Dynamic,
1685 origin: crate::physical::ContractOrigin::Explicit,
1686 version: 1,
1687 created_at_unix_ms: now,
1688 updated_at_unix_ms: now,
1689 default_ttl_ms: None,
1690 vector_dimension: None,
1691 vector_metric: None,
1692 context_index_fields: Vec::new(),
1693 declared_columns: Vec::new(),
1694 table_def: None,
1695 timestamps_enabled: false,
1696 context_index_enabled: false,
1697 metrics_raw_retention_ms: None,
1698 metrics_rollup_policies: Vec::new(),
1699 metrics_tenant_identity: None,
1700 metrics_namespace: None,
1701 append_only: false,
1702 subscriptions: Vec::new(),
1703 session_key: None,
1704 session_gap_ms: None,
1705 retention_duration_ms: None,
1706 }
1707}
1708
1709fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1710 let now = current_unix_ms();
1711 crate::physical::CollectionContract {
1712 name: query.name.clone(),
1713 declared_model: crate::catalog::CollectionModel::Metrics,
1714 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1715 origin: crate::physical::ContractOrigin::Explicit,
1716 version: 1,
1717 created_at_unix_ms: now,
1718 updated_at_unix_ms: now,
1719 default_ttl_ms: query.default_ttl_ms,
1720 vector_dimension: None,
1721 vector_metric: None,
1722 context_index_fields: Vec::new(),
1723 declared_columns: Vec::new(),
1724 table_def: None,
1725 timestamps_enabled: false,
1726 context_index_enabled: false,
1727 metrics_raw_retention_ms: query.default_ttl_ms,
1728 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1729 metrics_tenant_identity: Some(
1730 query
1731 .tenant_by
1732 .clone()
1733 .unwrap_or_else(|| "current_tenant".to_string()),
1734 ),
1735 metrics_namespace: Some("default".to_string()),
1736 append_only: true,
1737 subscriptions: Vec::new(),
1738 session_key: None,
1739 session_gap_ms: None,
1740 retention_duration_ms: None,
1741 }
1742}
1743
1744fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1745 let now = current_unix_ms();
1746 crate::physical::CollectionContract {
1747 name: query.name.clone(),
1748 declared_model: crate::catalog::CollectionModel::Vector,
1749 schema_mode: crate::catalog::SchemaMode::Dynamic,
1750 origin: crate::physical::ContractOrigin::Explicit,
1751 version: 1,
1752 created_at_unix_ms: now,
1753 updated_at_unix_ms: now,
1754 default_ttl_ms: None,
1755 vector_dimension: Some(query.dimension),
1756 vector_metric: Some(query.metric),
1757 context_index_fields: Vec::new(),
1758 declared_columns: Vec::new(),
1759 table_def: None,
1760 timestamps_enabled: false,
1761 context_index_enabled: false,
1762 metrics_raw_retention_ms: None,
1763 metrics_rollup_policies: Vec::new(),
1764 metrics_tenant_identity: None,
1765 metrics_namespace: None,
1766 append_only: false,
1767 subscriptions: Vec::new(),
1768 session_key: None,
1769 session_gap_ms: None,
1770 retention_duration_ms: None,
1771 }
1772}
1773
1774fn declared_column_contract_from_ddl(
1775 column: &CreateColumnDef,
1776) -> crate::physical::DeclaredColumnContract {
1777 crate::physical::DeclaredColumnContract {
1778 name: column.name.clone(),
1779 data_type: column.data_type.clone(),
1780 sql_type: Some(column.sql_type.clone()),
1781 not_null: column.not_null,
1782 default: column.default.clone(),
1783 compress: column.compress,
1784 unique: column.unique,
1785 primary_key: column.primary_key,
1786 enum_variants: column.enum_variants.clone(),
1787 array_element: column.array_element.clone(),
1788 decimal_precision: column.decimal_precision,
1789 }
1790}
1791
1792fn apply_alter_operations_to_contract(
1793 contract: &mut crate::physical::CollectionContract,
1794 operations: &[AlterOperation],
1795) {
1796 if contract.table_def.is_none() {
1797 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1798 }
1799 for operation in operations {
1800 match operation {
1801 AlterOperation::AddColumn(column) => {
1802 if !contract
1803 .declared_columns
1804 .iter()
1805 .any(|existing| existing.name == column.name)
1806 {
1807 contract
1808 .declared_columns
1809 .push(declared_column_contract_from_ddl(column));
1810 }
1811 if let Some(table_def) = contract.table_def.as_mut() {
1812 if table_def.get_column(&column.name).is_none() {
1813 if let Ok(column_def) = column_def_from_ddl(column) {
1814 if column.primary_key {
1815 table_def.primary_key.push(column.name.clone());
1816 table_def.constraints.push(
1817 crate::storage::schema::Constraint::new(
1818 format!("pk_{}", column.name),
1819 crate::storage::schema::ConstraintType::PrimaryKey,
1820 )
1821 .on_columns(vec![column.name.clone()]),
1822 );
1823 }
1824 if column.unique {
1825 table_def.constraints.push(
1826 crate::storage::schema::Constraint::new(
1827 format!("uniq_{}", column.name),
1828 crate::storage::schema::ConstraintType::Unique,
1829 )
1830 .on_columns(vec![column.name.clone()]),
1831 );
1832 }
1833 if column.not_null {
1834 table_def.constraints.push(
1835 crate::storage::schema::Constraint::new(
1836 format!("not_null_{}", column.name),
1837 crate::storage::schema::ConstraintType::NotNull,
1838 )
1839 .on_columns(vec![column.name.clone()]),
1840 );
1841 }
1842 table_def.columns.push(column_def);
1843 }
1844 }
1845 }
1846 }
1847 AlterOperation::DropColumn(name) => {
1848 contract
1849 .declared_columns
1850 .retain(|column| column.name != *name);
1851 if let Some(table_def) = contract.table_def.as_mut() {
1852 if let Some(index) = table_def.column_index(name) {
1853 table_def.columns.remove(index);
1854 }
1855 table_def.primary_key.retain(|column| column != name);
1856 table_def.constraints.retain(|constraint| {
1857 !constraint.columns.iter().any(|column| column == name)
1858 });
1859 table_def
1860 .indexes
1861 .retain(|index| !index.columns.iter().any(|column| column == name));
1862 }
1863 }
1864 AlterOperation::RenameColumn { from, to } => {
1865 if contract
1866 .declared_columns
1867 .iter()
1868 .any(|column| column.name == *to)
1869 {
1870 continue;
1871 }
1872 if let Some(column) = contract
1873 .declared_columns
1874 .iter_mut()
1875 .find(|column| column.name == *from)
1876 {
1877 column.name = to.clone();
1878 }
1879 if let Some(table_def) = contract.table_def.as_mut() {
1880 if let Some(column) = table_def
1881 .columns
1882 .iter_mut()
1883 .find(|column| column.name == *from)
1884 {
1885 column.name = to.clone();
1886 }
1887 for primary_key in &mut table_def.primary_key {
1888 if *primary_key == *from {
1889 *primary_key = to.clone();
1890 }
1891 }
1892 for constraint in &mut table_def.constraints {
1893 for column in &mut constraint.columns {
1894 if *column == *from {
1895 *column = to.clone();
1896 }
1897 }
1898 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1899 for column in ref_columns {
1900 if *column == *from {
1901 *column = to.clone();
1902 }
1903 }
1904 }
1905 }
1906 for index in &mut table_def.indexes {
1907 for column in &mut index.columns {
1908 if *column == *from {
1909 *column = to.clone();
1910 }
1911 }
1912 }
1913 }
1914 }
1915 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1918 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1922 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1925 AlterOperation::SetAppendOnly(on) => {
1926 contract.append_only = *on;
1927 }
1928 AlterOperation::SetVersioned(_) => {}
1931 AlterOperation::EnableEvents(subscription) => {
1932 let mut subscription = subscription.clone();
1933 subscription.source = contract.name.clone();
1934 subscription.enabled = true;
1935 if let Some(existing) = contract
1936 .subscriptions
1937 .iter_mut()
1938 .find(|existing| existing.target_queue == subscription.target_queue)
1939 {
1940 *existing = subscription;
1941 } else {
1942 contract.subscriptions.push(subscription);
1943 }
1944 }
1945 AlterOperation::DisableEvents => {
1946 for subscription in &mut contract.subscriptions {
1947 subscription.enabled = false;
1948 }
1949 }
1950 AlterOperation::AddSubscription { name, descriptor } => {
1951 let mut sub = descriptor.clone();
1952 sub.name = name.clone();
1953 sub.source = contract.name.clone();
1954 sub.enabled = true;
1955 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1956 {
1957 *existing = sub;
1958 } else {
1959 contract.subscriptions.push(sub);
1960 }
1961 }
1962 AlterOperation::DropSubscription { name } => {
1963 contract.subscriptions.retain(|s| s.name != *name);
1964 }
1965 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1970 AlterOperation::SetRetention { duration_ms } => {
1971 contract.retention_duration_ms = Some(*duration_ms);
1972 }
1973 AlterOperation::UnsetRetention => {
1974 contract.retention_duration_ms = None;
1975 }
1976 }
1977 }
1978}
1979
1980pub(crate) fn retention_timestamp_column_exists(
1985 contract: &crate::physical::CollectionContract,
1986) -> bool {
1987 if contract.timestamps_enabled {
1988 return true;
1989 }
1990 if matches!(
1991 contract.declared_model,
1992 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
1993 ) {
1994 return true;
1998 }
1999 contract
2000 .declared_columns
2001 .iter()
2002 .any(|column| is_temporal_data_type(&column.data_type))
2003}
2004
2005fn is_temporal_data_type(data_type: &str) -> bool {
2006 let upper = data_type.to_ascii_uppercase();
2007 matches!(
2008 upper.as_str(),
2009 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2010 )
2011}
2012
2013fn validate_event_subscriptions(
2014 runtime: &RedDBRuntime,
2015 source: &str,
2016 subscriptions: &[crate::catalog::SubscriptionDescriptor],
2017) -> RedDBResult<()> {
2018 for subscription in subscriptions
2019 .iter()
2020 .filter(|subscription| subscription.enabled)
2021 {
2022 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2023 return Err(RedDBError::Query(
2024 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2025 ));
2026 }
2027 validate_subscription_auth(runtime, source, subscription)?;
2028 if subscription.target_queue == source
2029 || subscription_would_create_cycle(
2030 &runtime.inner.db,
2031 source,
2032 &subscription.target_queue,
2033 )
2034 {
2035 return Err(RedDBError::Query(
2036 "subscription would create cycle".to_string(),
2037 ));
2038 }
2039 audit_subscription_redact_gap(runtime, source, subscription);
2040 }
2041 Ok(())
2042}
2043
2044fn validate_subscription_auth(
2045 runtime: &RedDBRuntime,
2046 source: &str,
2047 subscription: &crate::catalog::SubscriptionDescriptor,
2048) -> RedDBResult<()> {
2049 let auth_store = match runtime.inner.auth_store.read().clone() {
2050 Some(store) => store,
2051 None => return Ok(()),
2052 };
2053 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2054 Some(identity) => identity,
2055 None => return Ok(()),
2056 };
2057 let tenant = crate::runtime::impl_core::current_tenant();
2058 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2059
2060 if auth_store.iam_authorization_enabled() {
2061 let ctx = crate::auth::policies::EvalContext {
2062 principal_tenant: tenant.clone(),
2063 current_tenant: tenant.clone(),
2064 peer_ip: None,
2065 mfa_present: false,
2066 now_ms: crate::auth::now_ms(),
2067 principal_is_admin_role: role == crate::auth::Role::Admin,
2068 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2069 principal_is_platform_scoped: principal.tenant.is_none(),
2070 };
2071 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2072 if let Some(t) = tenant.as_deref() {
2073 source_resource = source_resource.with_tenant(t.to_string());
2074 }
2075 if !auth_store.check_policy_authz_with_role(
2076 &principal,
2077 "select",
2078 &source_resource,
2079 &ctx,
2080 role,
2081 ) {
2082 return Err(RedDBError::Query(format!(
2083 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2084 principal, source_resource.kind, source_resource.name
2085 )));
2086 }
2087
2088 let mut target_resource =
2089 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2090 if let Some(t) = tenant.as_deref() {
2091 target_resource = target_resource.with_tenant(t.to_string());
2092 }
2093 if !auth_store.check_policy_authz_with_role(
2094 &principal,
2095 "write",
2096 &target_resource,
2097 &ctx,
2098 role,
2099 ) {
2100 return Err(RedDBError::Query(format!(
2101 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2102 principal, target_resource.kind, target_resource.name
2103 )));
2104 }
2105 return Ok(());
2106 }
2107
2108 let ctx = crate::auth::privileges::AuthzContext {
2109 principal: &username,
2110 effective_role: role,
2111 tenant: tenant.as_deref(),
2112 };
2113 auth_store
2114 .check_grant(
2115 &ctx,
2116 crate::auth::privileges::Action::Select,
2117 &crate::auth::privileges::Resource::table_from_name(source),
2118 )
2119 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2120 auth_store
2121 .check_grant(
2122 &ctx,
2123 crate::auth::privileges::Action::Insert,
2124 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2125 )
2126 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2127 Ok(())
2128}
2129
2130fn audit_subscription_redact_gap(
2131 runtime: &RedDBRuntime,
2132 source: &str,
2133 subscription: &crate::catalog::SubscriptionDescriptor,
2134) {
2135 let auth_store = match runtime.inner.auth_store.read().clone() {
2136 Some(store) if store.iam_authorization_enabled() => store,
2137 _ => return,
2138 };
2139 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2140 Some(identity) => identity,
2141 None => return,
2142 };
2143 let tenant = crate::runtime::impl_core::current_tenant();
2144 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2145 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2146 if missing.is_empty() {
2147 return;
2148 }
2149
2150 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2151 tracing::warn!(
2152 target: "reddb::operator",
2153 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2154 source,
2155 subscription.target_queue,
2156 columns
2157 );
2158 let mut event = AuditEvent::builder("subscription_redact_gap")
2159 .principal(username)
2160 .source(AuditAuthSource::System)
2161 .resource(format!(
2162 "subscription:{}->{}",
2163 source, subscription.target_queue
2164 ))
2165 .outcome(Outcome::Success)
2166 .field(AuditFieldEscaper::field("source", source))
2167 .field(AuditFieldEscaper::field(
2168 "target_queue",
2169 subscription.target_queue.clone(),
2170 ))
2171 .field(AuditFieldEscaper::field(
2172 "subscription",
2173 subscription.name.clone(),
2174 ))
2175 .field(AuditFieldEscaper::field("columns", columns))
2176 .field(AuditFieldEscaper::field("role", role.as_str()));
2177 if let Some(t) = tenant {
2178 event = event.tenant(t);
2179 }
2180 runtime.inner.audit_log.record_event(event.build());
2181}
2182
2183fn subscription_redact_gap_columns(
2184 auth_store: &crate::auth::store::AuthStore,
2185 principal: &crate::auth::UserId,
2186 source: &str,
2187 subscription: &crate::catalog::SubscriptionDescriptor,
2188) -> BTreeSet<String> {
2189 let redacted: HashSet<String> = subscription
2190 .redact_fields
2191 .iter()
2192 .map(|field| field.to_ascii_lowercase())
2193 .collect();
2194 auth_store
2195 .effective_policies(principal)
2196 .iter()
2197 .flat_map(|policy| policy.statements.iter())
2198 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2199 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2200 .flat_map(|statement| statement.resources.iter())
2201 .filter_map(|resource| denied_column_for_source(resource, source))
2202 .filter(|column| !redact_covers_column(&redacted, source, column))
2203 .collect()
2204}
2205
2206fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2207 match pattern {
2208 crate::auth::policies::ActionPattern::Wildcard => true,
2209 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2210 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2211 "select".len() > prefix.len() + 1
2212 && "select".starts_with(prefix)
2213 && "select".as_bytes()[prefix.len()] == b':'
2214 }
2215 }
2216}
2217
2218fn denied_column_for_source(
2219 resource: &crate::auth::policies::ResourcePattern,
2220 source: &str,
2221) -> Option<String> {
2222 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2223 return None;
2224 };
2225 if kind != "column" {
2226 return None;
2227 }
2228 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2229 (column.table_resource_name() == source).then_some(column.column)
2230}
2231
2232fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2233 let column = column.to_ascii_lowercase();
2234 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2235 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2236}
2237
2238fn subscription_would_create_cycle(
2239 db: &crate::storage::unified::devx::RedDB,
2240 source: &str,
2241 target: &str,
2242) -> bool {
2243 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2244 for contract in db.collection_contracts() {
2245 for subscription in contract
2246 .subscriptions
2247 .into_iter()
2248 .filter(|subscription| subscription.enabled)
2249 {
2250 graph
2251 .entry(subscription.source)
2252 .or_default()
2253 .push(subscription.target_queue);
2254 }
2255 }
2256 graph
2257 .entry(source.to_string())
2258 .or_default()
2259 .push(target.to_string());
2260
2261 let mut stack = vec![target.to_string()];
2262 let mut seen = HashSet::new();
2263 while let Some(node) = stack.pop() {
2264 if node == source {
2265 return true;
2266 }
2267 if !seen.insert(node.clone()) {
2268 continue;
2269 }
2270 if let Some(next) = graph.get(&node) {
2271 stack.extend(next.iter().cloned());
2272 }
2273 }
2274 false
2275}
2276
2277pub(crate) fn ensure_event_target_queue_pub(
2278 runtime: &RedDBRuntime,
2279 queue: &str,
2280) -> RedDBResult<()> {
2281 ensure_event_target_queue(runtime, queue)
2282}
2283
2284fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2285 let store = runtime.inner.db.store();
2286 if store.get_collection(queue).is_some() {
2287 return Ok(());
2288 }
2289 store
2290 .create_collection(queue)
2291 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2292 runtime
2293 .inner
2294 .db
2295 .save_collection_contract(event_queue_collection_contract(queue))
2296 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2297 store.set_config_tree(
2298 &format!("queue.{queue}.mode"),
2299 &crate::serde_json::Value::String("fanout".to_string()),
2300 );
2301 Ok(())
2302}
2303
2304fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2305 let now = current_unix_ms();
2306 crate::physical::CollectionContract {
2307 name: queue.to_string(),
2308 declared_model: crate::catalog::CollectionModel::Queue,
2309 schema_mode: crate::catalog::SchemaMode::Dynamic,
2310 origin: crate::physical::ContractOrigin::Implicit,
2311 version: 1,
2312 created_at_unix_ms: now,
2313 updated_at_unix_ms: now,
2314 default_ttl_ms: None,
2315 vector_dimension: None,
2316 vector_metric: None,
2317 context_index_fields: Vec::new(),
2318 declared_columns: Vec::new(),
2319 table_def: None,
2320 timestamps_enabled: false,
2321 context_index_enabled: false,
2322 metrics_raw_retention_ms: None,
2323 metrics_rollup_policies: Vec::new(),
2324 metrics_tenant_identity: None,
2325 metrics_namespace: None,
2326 append_only: true,
2327 subscriptions: Vec::new(),
2328 session_key: None,
2329 session_gap_ms: None,
2330 retention_duration_ms: None,
2331 }
2332}
2333
2334fn build_table_def_from_create_table(
2335 query: &CreateTableQuery,
2336) -> RedDBResult<crate::storage::schema::TableDef> {
2337 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2338 for column in &query.columns {
2339 if column.primary_key {
2340 table.primary_key.push(column.name.clone());
2341 table.constraints.push(
2342 crate::storage::schema::Constraint::new(
2343 format!("pk_{}", column.name),
2344 crate::storage::schema::ConstraintType::PrimaryKey,
2345 )
2346 .on_columns(vec![column.name.clone()]),
2347 );
2348 }
2349 if column.unique {
2350 table.constraints.push(
2351 crate::storage::schema::Constraint::new(
2352 format!("uniq_{}", column.name),
2353 crate::storage::schema::ConstraintType::Unique,
2354 )
2355 .on_columns(vec![column.name.clone()]),
2356 );
2357 }
2358 if column.not_null {
2359 table.constraints.push(
2360 crate::storage::schema::Constraint::new(
2361 format!("not_null_{}", column.name),
2362 crate::storage::schema::ConstraintType::NotNull,
2363 )
2364 .on_columns(vec![column.name.clone()]),
2365 );
2366 }
2367 table.columns.push(column_def_from_ddl(column)?);
2368 }
2369 if query.timestamps {
2374 table.columns.push(
2375 crate::storage::schema::ColumnDef::new(
2376 "created_at".to_string(),
2377 crate::storage::schema::DataType::UnsignedInteger,
2378 )
2379 .not_null(),
2380 );
2381 table.columns.push(
2382 crate::storage::schema::ColumnDef::new(
2383 "updated_at".to_string(),
2384 crate::storage::schema::DataType::UnsignedInteger,
2385 )
2386 .not_null(),
2387 );
2388 table.constraints.push(
2389 crate::storage::schema::Constraint::new(
2390 "not_null_created_at".to_string(),
2391 crate::storage::schema::ConstraintType::NotNull,
2392 )
2393 .on_columns(vec!["created_at".to_string()]),
2394 );
2395 table.constraints.push(
2396 crate::storage::schema::Constraint::new(
2397 "not_null_updated_at".to_string(),
2398 crate::storage::schema::ConstraintType::NotNull,
2399 )
2400 .on_columns(vec!["updated_at".to_string()]),
2401 );
2402 }
2403 table
2404 .validate()
2405 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2406 Ok(table)
2407}
2408
2409fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2410 let data_type = resolve_declared_data_type(&column.data_type)
2411 .map_err(|err| RedDBError::Query(err.to_string()))?;
2412 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2413 if column.not_null {
2414 column_def = column_def.not_null();
2415 }
2416 if let Some(default) = &column.default {
2417 column_def = column_def.with_default(default.as_bytes().to_vec());
2418 }
2419 if column.compress.unwrap_or(0) > 0 {
2420 column_def = column_def.compressed();
2421 }
2422 if !column.enum_variants.is_empty() {
2423 column_def = column_def.with_variants(column.enum_variants.clone());
2424 }
2425 if let Some(precision) = column.decimal_precision {
2426 column_def = column_def.with_precision(precision);
2427 }
2428 if let Some(element_type) = &column.array_element {
2429 column_def = column_def.with_element_type(
2430 resolve_declared_data_type(element_type)
2431 .map_err(|err| RedDBError::Query(err.to_string()))?,
2432 );
2433 }
2434 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2435 if column.unique {
2436 column_def = column_def.with_metadata("unique", "true");
2437 }
2438 if column.primary_key {
2439 column_def = column_def.with_metadata("primary_key", "true");
2440 }
2441 Ok(column_def)
2442}
2443
2444fn current_unix_ms() -> u128 {
2445 std::time::SystemTime::now()
2446 .duration_since(std::time::UNIX_EPOCH)
2447 .unwrap_or_default()
2448 .as_millis()
2449}
2450
2451#[cfg(test)]
2452mod tests {
2453 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2454 use crate::auth::store::{AuthStore, PrincipalRef};
2455 use crate::auth::UserId;
2456 use crate::auth::{AuthConfig, Role};
2457 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2458 use crate::storage::schema::Value;
2459 use crate::{RedDBOptions, RedDBRuntime};
2460 use std::sync::Arc;
2461
2462 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2463 Policy {
2464 id: id.to_string(),
2465 version: 1,
2466 tenant: None,
2467 created_at: 0,
2468 updated_at: 0,
2469 statements: vec![Statement {
2470 sid: None,
2471 effect: Effect::Allow,
2472 actions: vec![ActionPattern::Exact(action.to_string())],
2473 resources: vec![ResourcePattern::Exact {
2474 kind: "collection".to_string(),
2475 name: collection.to_string(),
2476 }],
2477 condition: None,
2478 }],
2479 }
2480 }
2481
2482 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2483 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2484 *rt.inner.auth_store.write() = Some(store.clone());
2485 store
2486 }
2487
2488 #[test]
2489 fn drop_denied_without_iam_policy() {
2490 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2491 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2492 let store = wire_auth_store(&rt);
2493 let select_only = Policy {
2495 id: "select-only".to_string(),
2496 version: 1,
2497 tenant: None,
2498 created_at: 0,
2499 updated_at: 0,
2500 statements: vec![Statement {
2501 sid: None,
2502 effect: Effect::Allow,
2503 actions: vec![ActionPattern::Exact("select".to_string())],
2504 resources: vec![ResourcePattern::Wildcard],
2505 condition: None,
2506 }],
2507 };
2508 store.put_policy_internal(select_only).unwrap();
2509 let alice = UserId::from_parts(None, "alice");
2510 store
2511 .attach_policy(PrincipalRef::User(alice), "select-only")
2512 .unwrap();
2513 set_current_auth_identity("alice".to_string(), Role::Write);
2514 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2515 clear_current_auth_identity();
2516 assert!(
2517 format!("{err}").contains("denied by IAM policy"),
2518 "got: {err}"
2519 );
2520 }
2521
2522 #[test]
2523 fn drop_allowed_with_explicit_iam_policy() {
2524 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2525 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2526 let store = wire_auth_store(&rt);
2527 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2528 store.put_policy_internal(policy).unwrap();
2529 let bob = UserId::from_parts(None, "bob");
2530 store
2531 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2532 .unwrap();
2533 set_current_auth_identity("bob".to_string(), Role::Write);
2534 rt.execute_query("DROP TABLE bar").unwrap();
2535 clear_current_auth_identity();
2536 }
2537
2538 #[test]
2539 fn drop_allowed_with_wildcard_iam_policy() {
2540 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2541 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2542 let store = wire_auth_store(&rt);
2543 let policy = Policy {
2544 id: "allow-drop-all".to_string(),
2545 version: 1,
2546 tenant: None,
2547 created_at: 0,
2548 updated_at: 0,
2549 statements: vec![Statement {
2550 sid: None,
2551 effect: Effect::Allow,
2552 actions: vec![ActionPattern::Exact("drop".to_string())],
2553 resources: vec![ResourcePattern::Wildcard],
2554 condition: None,
2555 }],
2556 };
2557 store.put_policy_internal(policy).unwrap();
2558 let carl = UserId::from_parts(None, "carl");
2559 store
2560 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2561 .unwrap();
2562 set_current_auth_identity("carl".to_string(), Role::Write);
2563 rt.execute_query("DROP TABLE baz").unwrap();
2564 clear_current_auth_identity();
2565 }
2566
2567 #[test]
2568 fn truncate_denied_without_iam_policy() {
2569 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2570 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2571 let store = wire_auth_store(&rt);
2572 store
2579 .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2580 let select_only = Policy {
2582 id: "select-only-2".to_string(),
2583 version: 1,
2584 tenant: None,
2585 created_at: 0,
2586 updated_at: 0,
2587 statements: vec![Statement {
2588 sid: None,
2589 effect: Effect::Allow,
2590 actions: vec![ActionPattern::Exact("select".to_string())],
2591 resources: vec![ResourcePattern::Wildcard],
2592 condition: None,
2593 }],
2594 };
2595 store.put_policy_internal(select_only).unwrap();
2596 let dana = UserId::from_parts(None, "dana");
2597 store
2598 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2599 .unwrap();
2600 set_current_auth_identity("dana".to_string(), Role::Write);
2601 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2602 clear_current_auth_identity();
2603 assert!(
2604 format!("{err}").contains("denied by IAM policy"),
2605 "got: {err}"
2606 );
2607 }
2608
2609 #[test]
2610 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2611 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2612 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2613 .unwrap();
2614 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2615 .unwrap();
2616 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2617 .unwrap();
2618
2619 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2620 assert_eq!(truncated.statement_type, "truncate");
2621 assert_eq!(truncated.affected_rows, 0);
2622
2623 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2624 assert!(empty.result.records.is_empty());
2625
2626 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2627 .unwrap();
2628 let selected = rt
2629 .execute_query("SELECT name FROM users WHERE id = 3")
2630 .unwrap();
2631 let name = selected.result.records[0].get("name").unwrap();
2632 assert_eq!(name, &Value::text("cy"));
2633 assert!(rt.db().collection_contract("users").is_some());
2634 assert!(rt
2635 .inner
2636 .index_store
2637 .list_indices("users")
2638 .iter()
2639 .any(|index| index.name == "idx_users_id"));
2640 }
2641
2642 #[test]
2643 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2644 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2645 rt.execute_query("CREATE QUEUE tasks").unwrap();
2646 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2647
2648 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2649 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2650
2651 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2652 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2653 assert_eq!(
2654 len.result.records[0].get("len"),
2655 Some(&Value::UnsignedInteger(0))
2656 );
2657 }
2658
2659 #[test]
2660 fn truncate_system_schema_is_read_only() {
2661 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2662 let err = rt
2663 .execute_query("TRUNCATE COLLECTION red.collections")
2664 .unwrap_err();
2665 assert!(format!("{err}").contains("system schema is read-only"));
2666 }
2667
2668 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2671 let result = rt
2672 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2673 .expect("peek queue");
2674 result
2675 .result
2676 .records
2677 .iter()
2678 .map(
2679 |record| match record.get("payload").expect("payload column") {
2680 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2681 other => panic!("expected JSON queue payload, got {other:?}"),
2682 },
2683 )
2684 .collect()
2685 }
2686
2687 #[test]
2690 fn truncate_event_enabled_table_emits_single_truncate_event() {
2691 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2692 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2693 .unwrap();
2694 rt.execute_query(
2695 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2696 )
2697 .unwrap();
2698
2699 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2701
2702 rt.execute_query("TRUNCATE TABLE users").unwrap();
2703
2704 let events = queue_payloads(&rt, "users_events");
2705 assert_eq!(
2707 events.len(),
2708 1,
2709 "expected 1 truncate event, got {}",
2710 events.len()
2711 );
2712 let ev = events[0].as_object().expect("event is object");
2713 assert_eq!(
2714 ev.get("op").and_then(crate::json::Value::as_str),
2715 Some("truncate")
2716 );
2717 assert_eq!(
2718 ev.get("collection").and_then(crate::json::Value::as_str),
2719 Some("users")
2720 );
2721 assert_eq!(
2722 ev.get("entities_count")
2723 .and_then(crate::json::Value::as_u64),
2724 Some(3)
2725 );
2726 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2727 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2728 assert!(ev
2729 .get("event_id")
2730 .and_then(crate::json::Value::as_str)
2731 .is_some_and(|s| !s.is_empty()));
2732 }
2733
2734 #[test]
2736 fn truncate_no_events_collection_emits_nothing() {
2737 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2738 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2739 .unwrap();
2740 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2741 .unwrap();
2742 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2744 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2746 assert!(rows.result.records.is_empty());
2747 }
2748
2749 #[test]
2753 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2754 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2755 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2756 .unwrap();
2757 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2758 .unwrap();
2759
2760 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2762
2763 rt.execute_query("DROP TABLE users").unwrap();
2764
2765 let events = queue_payloads(&rt, "users_events");
2767 assert_eq!(
2768 events.len(),
2769 1,
2770 "expected 1 collection_dropped event, got {}",
2771 events.len()
2772 );
2773 let ev = events[0].as_object().expect("event is object");
2774 assert_eq!(
2775 ev.get("op").and_then(crate::json::Value::as_str),
2776 Some("collection_dropped")
2777 );
2778 assert_eq!(
2779 ev.get("collection").and_then(crate::json::Value::as_str),
2780 Some("users")
2781 );
2782 assert_eq!(
2783 ev.get("final_entities_count")
2784 .and_then(crate::json::Value::as_u64),
2785 Some(2)
2786 );
2787 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2788 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2789 assert!(ev
2790 .get("event_id")
2791 .and_then(crate::json::Value::as_str)
2792 .is_some_and(|s| !s.is_empty()));
2793
2794 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2796 assert!(
2797 format!("{err}").contains("users"),
2798 "expected not-found error"
2799 );
2800 }
2801
2802 #[test]
2805 fn drop_no_events_collection_emits_nothing() {
2806 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2807 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2808 .unwrap();
2809 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2810 .unwrap();
2811 rt.execute_query("DROP TABLE plain").unwrap();
2812 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2814 assert!(format!("{err}").contains("plain"));
2815 }
2816
2817 #[test]
2821 fn ops_filter_insert_only_ignores_update_and_delete() {
2822 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2823 rt.execute_query(
2824 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2825 )
2826 .unwrap();
2827 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2828 .unwrap();
2829 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2830 .unwrap();
2831 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2832
2833 let events = queue_payloads(&rt, "items_events");
2834 assert_eq!(
2836 events.len(),
2837 1,
2838 "expected 1 insert event, got {}",
2839 events.len()
2840 );
2841 assert_eq!(
2842 events[0]
2843 .as_object()
2844 .unwrap()
2845 .get("op")
2846 .and_then(crate::json::Value::as_str),
2847 Some("insert")
2848 );
2849 }
2850
2851 #[test]
2853 fn where_filter_skips_rows_that_do_not_match() {
2854 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2855 rt.execute_query(
2856 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2857 )
2858 .unwrap();
2859
2860 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2862 .unwrap();
2863 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2865 .unwrap();
2866
2867 let events = queue_payloads(&rt, "users_events");
2868 assert_eq!(
2869 events.len(),
2870 1,
2871 "expected 1 event (only active), got {}",
2872 events.len()
2873 );
2874 let ev = events[0].as_object().unwrap();
2875 assert_eq!(
2876 ev.get("op").and_then(crate::json::Value::as_str),
2877 Some("insert")
2878 );
2879 let after = ev.get("after").unwrap().as_object().unwrap();
2880 assert_eq!(
2881 after.get("status").and_then(crate::json::Value::as_str),
2882 Some("active")
2883 );
2884 }
2885
2886 #[test]
2888 fn ops_filter_and_where_filter_combined() {
2889 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2890 rt.execute_query(
2891 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2892 )
2893 .unwrap();
2894
2895 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2897 .unwrap();
2898 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2900 .unwrap();
2901 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2903 .unwrap();
2904 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2906
2907 let events = queue_payloads(&rt, "items_events");
2908 assert_eq!(
2910 events.len(),
2911 1,
2912 "expected 1 event, got {}: {events:?}",
2913 events.len()
2914 );
2915 assert_eq!(
2916 events[0]
2917 .as_object()
2918 .unwrap()
2919 .get("op")
2920 .and_then(crate::json::Value::as_str),
2921 Some("insert")
2922 );
2923 }
2924
2925 #[test]
2927 fn where_filter_on_delete_checks_before_state() {
2928 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929 rt.execute_query(
2930 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2931 )
2932 .unwrap();
2933
2934 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2935 .unwrap();
2936
2937 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2939 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2941
2942 let events = queue_payloads(&rt, "users_events");
2943 assert_eq!(
2944 events.len(),
2945 1,
2946 "expected 1 delete event, got {}",
2947 events.len()
2948 );
2949 let ev = events[0].as_object().unwrap();
2950 assert_eq!(
2951 ev.get("op").and_then(crate::json::Value::as_str),
2952 Some("delete")
2953 );
2954 }
2955
2956 #[test]
2960 fn alter_add_column_on_event_enabled_table_succeeds() {
2961 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2962 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2963 .unwrap();
2964 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2966 .unwrap();
2967 let contract = rt.db().collection_contract("users").unwrap();
2969 assert!(
2970 contract.declared_columns.iter().any(|c| c.name == "phone"),
2971 "phone column should be in contract"
2972 );
2973 assert!(
2975 contract.subscriptions.iter().any(|s| s.enabled),
2976 "subscription should remain enabled"
2977 );
2978 }
2979
2980 #[test]
2983 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2984 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2985 rt.execute_query(
2986 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2987 )
2988 .unwrap();
2989 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2991 .unwrap();
2992 let contract = rt.db().collection_contract("items").unwrap();
2993 assert!(
2994 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2995 "secret column should be removed"
2996 );
2997 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2999 .unwrap();
3000 assert!(
3002 contract.subscriptions.iter().any(|s| s.enabled),
3003 "subscription should remain enabled"
3004 );
3005 }
3006}