1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(&query.name, query.collection_model)
265 };
266 self.inner
267 .db
268 .save_collection_contract(contract)
269 .map_err(|err| RedDBError::Internal(err.to_string()))?;
270 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271 store.set_config_tree(
272 &format!("red.collection_tenants.{}", query.name),
273 &crate::serde_json::Value::String(tenant_id),
274 );
275 }
276 self.inner
277 .db
278 .persist_metadata()
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 self.invalidate_result_cache();
281
282 Ok(RuntimeQueryResult::ok_message(
283 raw_query.to_string(),
284 &format!("{label} '{}' created", query.name),
285 "create",
286 ))
287 }
288
289 pub fn execute_create_collection(
290 &self,
291 raw_query: &str,
292 query: &CreateCollectionQuery,
293 ) -> RedDBResult<RuntimeQueryResult> {
294 let model = match query.kind.as_str() {
295 "graph" => CollectionModel::Graph,
296 "document" => CollectionModel::Document,
297 "metrics" => CollectionModel::Metrics,
298 other => {
299 return Err(RedDBError::Query(format!(
300 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
301 )));
302 }
303 };
304 let create = CreateTableQuery {
305 collection_model: model,
306 name: query.name.clone(),
307 columns: Vec::new(),
308 if_not_exists: query.if_not_exists,
309 default_ttl_ms: None,
310 metrics_rollup_policies: Vec::new(),
311 context_index_fields: Vec::new(),
312 context_index_enabled: false,
313 timestamps: false,
314 partition_by: None,
315 tenant_by: None,
316 append_only: false,
317 subscriptions: Vec::new(),
318 vault_own_master_key: false,
319 };
320 self.execute_create_table(raw_query, &create)
321 }
322
323 pub fn execute_create_vector(
324 &self,
325 raw_query: &str,
326 query: &CreateVectorQuery,
327 ) -> RedDBResult<RuntimeQueryResult> {
328 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
329 if is_system_schema_name(&query.name) {
330 return Err(RedDBError::Query("system schema is read-only".to_string()));
331 }
332 let store = self.inner.db.store();
333 if store.get_collection(&query.name).is_some() {
334 if query.if_not_exists {
335 return Ok(RuntimeQueryResult::ok_message(
336 raw_query.to_string(),
337 &format!("vector '{}' already exists", query.name),
338 "create",
339 ));
340 }
341 return Err(RedDBError::Query(format!(
342 "vector '{}' already exists",
343 query.name
344 )));
345 }
346
347 store
348 .create_collection(&query.name)
349 .map_err(|err| RedDBError::Internal(err.to_string()))?;
350 self.inner
351 .db
352 .save_collection_contract(vector_collection_contract(query))
353 .map_err(|err| RedDBError::Internal(err.to_string()))?;
354 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
355 store.set_config_tree(
356 &format!("red.collection_tenants.{}", query.name),
357 &crate::serde_json::Value::String(tenant_id),
358 );
359 }
360 self.inner
361 .db
362 .persist_metadata()
363 .map_err(|err| RedDBError::Internal(err.to_string()))?;
364 self.invalidate_result_cache();
365
366 Ok(RuntimeQueryResult::ok_message(
367 raw_query.to_string(),
368 &format!("vector '{}' created", query.name),
369 "create",
370 ))
371 }
372
373 fn provision_vault_key_material(
374 &self,
375 collection: &str,
376 own_master_key: bool,
377 ) -> RedDBResult<()> {
378 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
379 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
380 })?;
381 if !auth_store.is_vault_backed() {
382 return Err(RedDBError::Query(
383 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
384 ));
385 }
386
387 if auth_store.vault_secret_key().is_none() {
388 let key = crate::auth::store::random_bytes(32);
389 auth_store
390 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
391 .map_err(|err| RedDBError::Query(err.to_string()))?;
392 }
393
394 if own_master_key {
395 let key = crate::auth::store::random_bytes(32);
396 auth_store
397 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
398 .map_err(|err| RedDBError::Query(err.to_string()))?;
399 }
400
401 Ok(())
402 }
403
404 pub fn execute_drop_table(
408 &self,
409 raw_query: &str,
410 query: &DropTableQuery,
411 ) -> RedDBResult<RuntimeQueryResult> {
412 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
413 let store = self.inner.db.store();
414
415 if is_system_schema_name(&query.name) {
416 return Err(RedDBError::Query("system schema is read-only".to_string()));
417 }
418
419 let exists = store.get_collection(&query.name).is_some();
420 if !exists {
421 if query.if_exists {
422 return Ok(RuntimeQueryResult::ok_message(
423 raw_query.to_string(),
424 &format!("table '{}' does not exist", query.name),
425 "drop",
426 ));
427 }
428 return Err(RedDBError::NotFound(format!(
429 "table '{}' not found",
430 query.name
431 )));
432 }
433 let actual =
434 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
435 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
436
437 let final_count = store
440 .get_collection(&query.name)
441 .map(|manager| manager.query_all(|_| true).len() as u64)
442 .unwrap_or(0);
443 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
444 self,
445 &query.name,
446 final_count,
447 )?;
448
449 let orphaned_indices: Vec<String> = self
450 .inner
451 .index_store
452 .list_indices(&query.name)
453 .into_iter()
454 .map(|index| index.name)
455 .collect();
456 for name in &orphaned_indices {
457 self.inner.index_store.drop_index(name, &query.name);
458 }
459
460 store
461 .drop_collection(&query.name)
462 .map_err(|err| RedDBError::Internal(err.to_string()))?;
463 self.inner.db.invalidate_vector_index(&query.name);
464 self.inner.db.clear_collection_default_ttl_ms(&query.name);
465 self.inner
466 .db
467 .remove_collection_contract(&query.name)
468 .map_err(|err| RedDBError::Internal(err.to_string()))?;
469 self.clear_table_planner_stats(&query.name);
470 self.invalidate_result_cache();
471 if let Some(store) = self.inner.auth_store.read().clone() {
475 store.invalidate_visible_collections_cache();
476 }
477 self.inner
478 .db
479 .persist_metadata()
480 .map_err(|err| RedDBError::Internal(err.to_string()))?;
481 self.schema_vocabulary_apply(
487 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
488 collection: query.name.clone(),
489 },
490 );
491
492 Ok(RuntimeQueryResult::ok_message(
493 raw_query.to_string(),
494 &format!("table '{}' dropped", query.name),
495 "drop",
496 ))
497 }
498
499 pub fn execute_drop_graph(
500 &self,
501 raw_query: &str,
502 query: &DropGraphQuery,
503 ) -> RedDBResult<RuntimeQueryResult> {
504 self.execute_drop_typed_collection(
505 raw_query,
506 &query.name,
507 query.if_exists,
508 CollectionModel::Graph,
509 "graph",
510 )
511 }
512
513 pub fn execute_drop_vector(
514 &self,
515 raw_query: &str,
516 query: &DropVectorQuery,
517 ) -> RedDBResult<RuntimeQueryResult> {
518 self.execute_drop_typed_collection(
519 raw_query,
520 &query.name,
521 query.if_exists,
522 CollectionModel::Vector,
523 "vector",
524 )
525 }
526
527 pub fn execute_drop_document(
528 &self,
529 raw_query: &str,
530 query: &DropDocumentQuery,
531 ) -> RedDBResult<RuntimeQueryResult> {
532 self.execute_drop_typed_collection(
533 raw_query,
534 &query.name,
535 query.if_exists,
536 CollectionModel::Document,
537 "document",
538 )
539 }
540
541 pub fn execute_drop_kv(
542 &self,
543 raw_query: &str,
544 query: &DropKvQuery,
545 ) -> RedDBResult<RuntimeQueryResult> {
546 let label = polymorphic_resolver::model_name(query.model);
547 self.execute_drop_typed_collection(
548 raw_query,
549 &query.name,
550 query.if_exists,
551 query.model,
552 label,
553 )
554 }
555
556 pub fn execute_drop_collection(
557 &self,
558 raw_query: &str,
559 query: &DropCollectionQuery,
560 ) -> RedDBResult<RuntimeQueryResult> {
561 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
562 if is_system_schema_name(&query.name) {
563 return Err(RedDBError::Query("system schema is read-only".to_string()));
564 }
565 let store = self.inner.db.store();
566 if store.get_collection(&query.name).is_none() {
567 if query.if_exists {
568 return Ok(RuntimeQueryResult::ok_message(
569 raw_query.to_string(),
570 &format!("collection '{}' does not exist", query.name),
571 "drop",
572 ));
573 }
574 return Err(RedDBError::NotFound(format!(
575 "collection '{}' not found",
576 query.name
577 )));
578 }
579
580 let actual =
581 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
582 if let Some(expected) = query.model {
583 polymorphic_resolver::ensure_model_match(expected, actual)?;
584 }
585
586 match actual {
587 CollectionModel::Table => self.execute_drop_table(
588 raw_query,
589 &DropTableQuery {
590 name: query.name.clone(),
591 if_exists: query.if_exists,
592 },
593 ),
594 CollectionModel::TimeSeries => self.execute_drop_timeseries(
595 raw_query,
596 &DropTimeSeriesQuery {
597 name: query.name.clone(),
598 if_exists: query.if_exists,
599 },
600 ),
601 CollectionModel::Queue => self.execute_drop_queue(
602 raw_query,
603 &DropQueueQuery {
604 name: query.name.clone(),
605 if_exists: query.if_exists,
606 },
607 ),
608 CollectionModel::Graph => self.execute_drop_graph(
609 raw_query,
610 &DropGraphQuery {
611 name: query.name.clone(),
612 if_exists: query.if_exists,
613 },
614 ),
615 CollectionModel::Vector => self.execute_drop_vector(
616 raw_query,
617 &DropVectorQuery {
618 name: query.name.clone(),
619 if_exists: query.if_exists,
620 },
621 ),
622 CollectionModel::Document => self.execute_drop_document(
623 raw_query,
624 &DropDocumentQuery {
625 name: query.name.clone(),
626 if_exists: query.if_exists,
627 },
628 ),
629 CollectionModel::Kv => self.execute_drop_kv(
630 raw_query,
631 &DropKvQuery {
632 name: query.name.clone(),
633 if_exists: query.if_exists,
634 model: CollectionModel::Kv,
635 },
636 ),
637 CollectionModel::Config => self.execute_drop_kv(
638 raw_query,
639 &DropKvQuery {
640 name: query.name.clone(),
641 if_exists: query.if_exists,
642 model: CollectionModel::Config,
643 },
644 ),
645 CollectionModel::Vault => self.execute_drop_kv(
646 raw_query,
647 &DropKvQuery {
648 name: query.name.clone(),
649 if_exists: query.if_exists,
650 model: CollectionModel::Vault,
651 },
652 ),
653 CollectionModel::Hll => self.execute_probabilistic_command(
654 raw_query,
655 &ProbabilisticCommand::DropHll {
656 name: query.name.clone(),
657 if_exists: query.if_exists,
658 },
659 ),
660 CollectionModel::Sketch => self.execute_probabilistic_command(
661 raw_query,
662 &ProbabilisticCommand::DropSketch {
663 name: query.name.clone(),
664 if_exists: query.if_exists,
665 },
666 ),
667 CollectionModel::Filter => self.execute_probabilistic_command(
668 raw_query,
669 &ProbabilisticCommand::DropFilter {
670 name: query.name.clone(),
671 if_exists: query.if_exists,
672 },
673 ),
674 CollectionModel::Metrics => self.execute_drop_typed_collection(
675 raw_query,
676 &query.name,
677 query.if_exists,
678 CollectionModel::Metrics,
679 "metrics",
680 ),
681 CollectionModel::Mixed => self.execute_drop_typed_collection(
682 raw_query,
683 &query.name,
684 query.if_exists,
685 CollectionModel::Mixed,
686 "collection",
687 ),
688 }
689 }
690
691 pub fn execute_alter_table(
697 &self,
698 raw_query: &str,
699 query: &AlterTableQuery,
700 ) -> RedDBResult<RuntimeQueryResult> {
701 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
702 let store = self.inner.db.store();
703
704 if store.get_collection(&query.name).is_none() {
706 return Err(RedDBError::NotFound(format!(
707 "table '{}' not found",
708 query.name
709 )));
710 }
711
712 let mut messages = Vec::new();
713
714 let fields_added: Vec<String> = query
716 .operations
717 .iter()
718 .filter_map(|op| {
719 if let AlterOperation::AddColumn(col) = op {
720 Some(col.name.clone())
721 } else {
722 None
723 }
724 })
725 .collect();
726 let fields_removed: Vec<String> = query
727 .operations
728 .iter()
729 .filter_map(|op| {
730 if let AlterOperation::DropColumn(name) = op {
731 Some(name.clone())
732 } else {
733 None
734 }
735 })
736 .collect();
737
738 for op in &query.operations {
739 match op {
740 AlterOperation::AddColumn(col) => {
741 messages.push(format!("column '{}' added", col.name));
743 }
744 AlterOperation::DropColumn(name) => {
745 messages.push(format!("column '{}' dropped", name));
746 }
747 AlterOperation::RenameColumn { from, to } => {
748 messages.push(format!("column '{}' renamed to '{}'", from, to));
749 }
750 AlterOperation::AttachPartition { child, bound } => {
751 store.set_config_tree(
755 &format!("partition.{}.children.{}", query.name, child),
756 &crate::serde_json::Value::String(bound.clone()),
757 );
758 messages.push(format!(
759 "partition '{child}' attached to '{}' ({bound})",
760 query.name
761 ));
762 }
763 AlterOperation::DetachPartition { child } => {
764 store.set_config_tree(
765 &format!("partition.{}.children.{}", query.name, child),
766 &crate::serde_json::Value::Null,
767 );
768 messages.push(format!(
769 "partition '{child}' detached from '{}'",
770 query.name
771 ));
772 }
773 AlterOperation::EnableRowLevelSecurity => {
774 self.inner
775 .rls_enabled_tables
776 .write()
777 .insert(query.name.clone());
778 store.set_config_tree(
780 &format!("rls.enabled.{}", query.name),
781 &crate::serde_json::Value::Bool(true),
782 );
783 self.invalidate_plan_cache();
784 messages.push(format!("row level security enabled on '{}'", query.name));
785 }
786 AlterOperation::DisableRowLevelSecurity => {
787 self.inner.rls_enabled_tables.write().remove(&query.name);
788 store.set_config_tree(
789 &format!("rls.enabled.{}", query.name),
790 &crate::serde_json::Value::Null,
791 );
792 self.invalidate_plan_cache();
793 messages.push(format!("row level security disabled on '{}'", query.name));
794 }
795 AlterOperation::EnableTenancy { column } => {
797 store.set_config_tree(
798 &format!("tenant_tables.{}.column", query.name),
799 &crate::serde_json::Value::String(column.clone()),
800 );
801 self.register_tenant_table(&query.name, column);
802 self.invalidate_plan_cache();
803 messages.push(format!(
804 "tenancy enabled on '{}' by column '{column}'",
805 query.name
806 ));
807 }
808 AlterOperation::DisableTenancy => {
809 store.set_config_tree(
810 &format!("tenant_tables.{}.column", query.name),
811 &crate::serde_json::Value::Null,
812 );
813 self.unregister_tenant_table(&query.name);
814 self.invalidate_plan_cache();
815 messages.push(format!("tenancy disabled on '{}'", query.name));
816 }
817 AlterOperation::SetAppendOnly(on) => {
818 messages.push(format!(
823 "append_only {} on '{}'",
824 if *on { "enabled" } else { "disabled" },
825 query.name
826 ));
827 }
828 AlterOperation::SetVersioned(on) => {
829 self.vcs_set_versioned(&query.name, *on)?;
836 messages.push(format!(
837 "versioned {} on '{}'",
838 if *on { "enabled" } else { "disabled" },
839 query.name
840 ));
841 }
842 AlterOperation::EnableEvents(subscription) => {
843 let mut subscription = subscription.clone();
844 subscription.source = query.name.clone();
845 validate_event_subscriptions(
846 self,
847 &query.name,
848 std::slice::from_ref(&subscription),
849 )?;
850 ensure_event_target_queue(self, &subscription.target_queue)?;
851 messages.push(format!(
852 "events enabled on '{}' to '{}'",
853 query.name, subscription.target_queue
854 ));
855 }
856 AlterOperation::DisableEvents => {
857 messages.push(format!("events disabled on '{}'", query.name));
858 }
859 AlterOperation::AddSubscription { name, descriptor } => {
860 let mut sub = descriptor.clone();
861 sub.name = name.clone();
862 sub.source = query.name.clone();
863 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
864 ensure_event_target_queue(self, &sub.target_queue)?;
865 messages.push(format!(
866 "subscription '{}' added on '{}' to '{}'",
867 name, query.name, sub.target_queue
868 ));
869 }
870 AlterOperation::DropSubscription { name } => {
871 messages.push(format!(
872 "subscription '{}' dropped on '{}'",
873 name, query.name
874 ));
875 }
876 }
877 }
878
879 let mut contract = self
880 .inner
881 .db
882 .collection_contract(&query.name)
883 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
884 apply_alter_operations_to_contract(&mut contract, &query.operations);
885 contract.version = contract.version.saturating_add(1);
886 contract.updated_at_unix_ms = current_unix_ms();
887 self.inner
888 .db
889 .save_collection_contract(contract)
890 .map_err(|err| RedDBError::Internal(err.to_string()))?;
891 if !fields_added.is_empty() || !fields_removed.is_empty() {
895 let sub_names: Vec<String> = self
896 .inner
897 .db
898 .collection_contract(&query.name)
899 .map(|c| {
900 c.subscriptions
901 .iter()
902 .filter(|s| s.enabled)
903 .map(|s| s.name.clone())
904 .collect()
905 })
906 .unwrap_or_default();
907 if !sub_names.is_empty() {
908 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
909 collection: query.name.clone(),
910 subscription_names: sub_names.join(", "),
911 fields_added: fields_added.join(", "),
912 fields_removed: fields_removed.join(", "),
913 lsn: self.cdc_current_lsn(),
914 }
915 .emit_global();
916 }
917 }
918
919 self.clear_table_planner_stats(&query.name);
920 self.invalidate_result_cache();
921 let post_alter_columns: Vec<String> = self
926 .inner
927 .db
928 .collection_contract(&query.name)
929 .map(|contract| {
930 contract
931 .declared_columns
932 .iter()
933 .map(|col| col.name.clone())
934 .collect()
935 })
936 .unwrap_or_default();
937 self.schema_vocabulary_apply(
938 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
939 collection: query.name.clone(),
940 columns: post_alter_columns,
941 type_tags: Vec::new(),
942 description: None,
943 },
944 );
945
946 let message = if messages.is_empty() {
947 format!("table '{}' altered (no operations)", query.name)
948 } else {
949 format!("table '{}' altered: {}", query.name, messages.join(", "))
950 };
951
952 Ok(RuntimeQueryResult::ok_message(
953 raw_query.to_string(),
954 &message,
955 "alter",
956 ))
957 }
958
959 pub fn execute_explain_alter(
966 &self,
967 raw_query: &str,
968 query: &ExplainAlterQuery,
969 ) -> RedDBResult<RuntimeQueryResult> {
970 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
974
975 let current_contract = self.inner.db.collection_contract(&query.target.name);
976
977 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
978 .as_ref()
979 .map(|c| c.declared_columns.clone())
980 .unwrap_or_default();
981
982 let diff = super::schema_diff::compute_column_diff(
983 &query.target.name,
984 ¤t_columns,
985 &query.target.columns,
986 );
987
988 let rendered = match query.format {
989 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
990 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
991 };
992
993 let format_label = match query.format {
994 ExplainFormat::Sql => "sql",
995 ExplainFormat::Json => "json",
996 };
997
998 let columns = vec![
999 "table".to_string(),
1000 "format".to_string(),
1001 "diff".to_string(),
1002 ];
1003 let row = vec![
1004 ("table".to_string(), Value::text(query.target.name.clone())),
1005 ("format".to_string(), Value::text(format_label.to_string())),
1006 ("diff".to_string(), Value::text(rendered)),
1007 ];
1008
1009 Ok(RuntimeQueryResult::ok_records(
1010 raw_query.to_string(),
1011 columns,
1012 vec![row],
1013 "explain",
1014 ))
1015 }
1016
1017 pub fn execute_create_index(
1022 &self,
1023 raw_query: &str,
1024 query: &CreateIndexQuery,
1025 ) -> RedDBResult<RuntimeQueryResult> {
1026 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1027 let store = self.inner.db.store();
1028
1029 let manager = store
1031 .get_collection(&query.table)
1032 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1033
1034 let method_kind = match query.method {
1035 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1036 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1037 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1038 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1039 };
1040
1041 let entities = manager.query_all(|_| true);
1051 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1052 entities
1053 .iter()
1054 .map(|e| {
1055 let fields = match &e.data {
1056 crate::storage::EntityData::Row(row) => {
1057 if let Some(ref named) = row.named {
1058 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1059 } else if let Some(ref schema) = row.schema {
1060 schema
1064 .iter()
1065 .zip(row.columns.iter())
1066 .map(|(k, v)| (k.clone(), v.clone()))
1067 .collect()
1068 } else {
1069 Vec::new()
1070 }
1071 }
1072 crate::storage::EntityData::Node(node) => node
1073 .properties
1074 .iter()
1075 .map(|(k, v)| (k.clone(), v.clone()))
1076 .collect(),
1077 _ => Vec::new(),
1078 };
1079 (e.id, fields)
1080 })
1081 .collect();
1082
1083 let indexed_count = self
1085 .inner
1086 .index_store
1087 .create_index(
1088 &query.name,
1089 &query.table,
1090 &query.columns,
1091 method_kind,
1092 query.unique,
1093 &entity_fields,
1094 )
1095 .map_err(RedDBError::Internal)?;
1096
1097 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1098 &query.table,
1099 &entity_fields,
1100 );
1101 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1102 self.invalidate_plan_cache();
1103
1104 self.inner
1106 .index_store
1107 .register(super::index_store::RegisteredIndex {
1108 name: query.name.clone(),
1109 collection: query.table.clone(),
1110 columns: query.columns.clone(),
1111 method: method_kind,
1112 unique: query.unique,
1113 });
1114 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1118 collection: query.table.clone(),
1119 index: query.name.clone(),
1120 columns: query.columns.clone(),
1121 });
1122
1123 let method_str = format!("{}", query.method);
1124 let unique_str = if query.unique { "unique " } else { "" };
1125 let cols = query.columns.join(", ");
1126
1127 Ok(RuntimeQueryResult::ok_message(
1128 raw_query.to_string(),
1129 &format!(
1130 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1131 unique_str, query.name, query.table, cols, method_str, indexed_count
1132 ),
1133 "create",
1134 ))
1135 }
1136
1137 pub fn execute_drop_index(
1141 &self,
1142 raw_query: &str,
1143 query: &DropIndexQuery,
1144 ) -> RedDBResult<RuntimeQueryResult> {
1145 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1146 let store = self.inner.db.store();
1147
1148 if store.get_collection(&query.table).is_none() {
1150 if query.if_exists {
1151 return Ok(RuntimeQueryResult::ok_message(
1152 raw_query.to_string(),
1153 &format!("table '{}' does not exist", query.table),
1154 "drop",
1155 ));
1156 }
1157 return Err(RedDBError::NotFound(format!(
1158 "table '{}' not found",
1159 query.table
1160 )));
1161 }
1162
1163 self.inner.index_store.drop_index(&query.name, &query.table);
1165 self.invalidate_plan_cache();
1166 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1168 collection: query.table.clone(),
1169 index: query.name.clone(),
1170 });
1171
1172 Ok(RuntimeQueryResult::ok_message(
1173 raw_query.to_string(),
1174 &format!("index '{}' dropped from '{}'", query.name, query.table),
1175 "drop",
1176 ))
1177 }
1178
1179 fn execute_drop_typed_collection(
1180 &self,
1181 raw_query: &str,
1182 name: &str,
1183 if_exists: bool,
1184 expected_model: CollectionModel,
1185 label: &str,
1186 ) -> RedDBResult<RuntimeQueryResult> {
1187 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1188 if is_system_schema_name(name) {
1189 return Err(RedDBError::Query("system schema is read-only".to_string()));
1190 }
1191 let store = self.inner.db.store();
1192 if store.get_collection(name).is_none() {
1193 if if_exists {
1194 return Ok(RuntimeQueryResult::ok_message(
1195 raw_query.to_string(),
1196 &format!("{label} '{name}' does not exist"),
1197 "drop",
1198 ));
1199 }
1200 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1201 }
1202
1203 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1204 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1205 self.drop_collection_storage(raw_query, name, label)
1206 }
1207
1208 pub fn execute_truncate(
1209 &self,
1210 raw_query: &str,
1211 query: &TruncateQuery,
1212 ) -> RedDBResult<RuntimeQueryResult> {
1213 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1214 if is_system_schema_name(&query.name) {
1215 return Err(RedDBError::Query("system schema is read-only".to_string()));
1216 }
1217
1218 let label = query
1219 .model
1220 .map(polymorphic_resolver::model_name)
1221 .unwrap_or("collection");
1222 let store = self.inner.db.store();
1223 if store.get_collection(&query.name).is_none() {
1224 if query.if_exists {
1225 return Ok(RuntimeQueryResult::ok_message(
1226 raw_query.to_string(),
1227 &format!("{label} '{}' does not exist", query.name),
1228 "truncate",
1229 ));
1230 }
1231 return Err(RedDBError::NotFound(format!(
1232 "{label} '{}' not found",
1233 query.name
1234 )));
1235 }
1236
1237 let actual =
1238 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1239 if let Some(expected) = query.model {
1240 polymorphic_resolver::ensure_model_match(expected, actual)?;
1241 }
1242
1243 if actual == CollectionModel::Queue {
1244 return self.execute_queue_command(
1245 raw_query,
1246 &QueueCommand::Purge {
1247 queue: query.name.clone(),
1248 },
1249 );
1250 }
1251
1252 let affected = self.truncate_collection_entities(&query.name)?;
1254 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1256 self.inner.db.invalidate_vector_index(&query.name);
1257 self.clear_table_planner_stats(&query.name);
1258 self.invalidate_result_cache();
1259
1260 Ok(RuntimeQueryResult::ok_message(
1261 raw_query.to_string(),
1262 &format!(
1263 "{affected} entities truncated from {label} '{}'",
1264 query.name
1265 ),
1266 "truncate",
1267 ))
1268 }
1269
1270 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1271 let store = self.inner.db.store();
1272 let Some(manager) = store.get_collection(name) else {
1273 return Ok(0);
1274 };
1275 let entities = manager.query_all(|_| true);
1276 if entities.is_empty() {
1277 return Ok(0);
1278 }
1279
1280 for entity in &entities {
1281 let fields = entity_index_fields(&entity.data);
1282 self.inner
1283 .index_store
1284 .index_entity_delete(name, entity.id, &fields)
1285 .map_err(RedDBError::Internal)?;
1286 }
1287
1288 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1289 let deleted_ids = store
1290 .delete_batch(name, &ids)
1291 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1292 for id in &deleted_ids {
1293 store.context_index().remove_entity(*id);
1294 }
1295 Ok(deleted_ids.len() as u64)
1296 }
1297
1298 fn drop_collection_storage(
1299 &self,
1300 raw_query: &str,
1301 name: &str,
1302 label: &str,
1303 ) -> RedDBResult<RuntimeQueryResult> {
1304 let store = self.inner.db.store();
1305
1306 let final_count = store
1309 .get_collection(name)
1310 .map(|manager| manager.query_all(|_| true).len() as u64)
1311 .unwrap_or(0);
1312 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1313 self,
1314 name,
1315 final_count,
1316 )?;
1317
1318 let orphaned_indices: Vec<String> = self
1319 .inner
1320 .index_store
1321 .list_indices(name)
1322 .into_iter()
1323 .map(|index| index.name)
1324 .collect();
1325 for index_name in &orphaned_indices {
1326 self.inner.index_store.drop_index(index_name, name);
1327 }
1328
1329 store
1330 .drop_collection(name)
1331 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1332 self.inner.db.invalidate_vector_index(name);
1333 self.inner.db.clear_collection_default_ttl_ms(name);
1334 self.inner
1335 .db
1336 .remove_collection_contract(name)
1337 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1338 self.clear_table_planner_stats(name);
1339 self.invalidate_result_cache();
1340 if let Some(store) = self.inner.auth_store.read().clone() {
1341 store.invalidate_visible_collections_cache();
1342 }
1343 self.inner
1344 .db
1345 .persist_metadata()
1346 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1347 self.schema_vocabulary_apply(
1348 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1349 collection: name.to_string(),
1350 },
1351 );
1352
1353 Ok(RuntimeQueryResult::ok_message(
1354 raw_query.to_string(),
1355 &format!("{label} '{name}' dropped"),
1356 "drop",
1357 ))
1358 }
1359}
1360
1361pub(crate) fn is_system_schema_name(name: &str) -> bool {
1362 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1363}
1364
1365fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1366 match data {
1367 EntityData::Row(row) => {
1368 if let Some(ref named) = row.named {
1369 named
1370 .iter()
1371 .map(|(key, value)| (key.clone(), value.clone()))
1372 .collect()
1373 } else if let Some(ref schema) = row.schema {
1374 schema
1375 .iter()
1376 .zip(row.columns.iter())
1377 .map(|(key, value)| (key.clone(), value.clone()))
1378 .collect()
1379 } else {
1380 Vec::new()
1381 }
1382 }
1383 EntityData::Node(node) => node
1384 .properties
1385 .iter()
1386 .map(|(key, value)| (key.clone(), value.clone()))
1387 .collect(),
1388 _ => Vec::new(),
1389 }
1390}
1391
1392fn collection_contract_from_create_table(
1393 query: &CreateTableQuery,
1394) -> RedDBResult<crate::physical::CollectionContract> {
1395 let now = current_unix_ms();
1396 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1397 .columns
1398 .iter()
1399 .map(declared_column_contract_from_ddl)
1400 .collect();
1401 if query.timestamps {
1402 declared_columns.push(crate::physical::DeclaredColumnContract {
1406 name: "created_at".to_string(),
1407 data_type: "BIGINT".to_string(),
1408 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1409 not_null: true,
1410 default: None,
1411 compress: None,
1412 unique: false,
1413 primary_key: false,
1414 enum_variants: Vec::new(),
1415 array_element: None,
1416 decimal_precision: None,
1417 });
1418 declared_columns.push(crate::physical::DeclaredColumnContract {
1419 name: "updated_at".to_string(),
1420 data_type: "BIGINT".to_string(),
1421 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1422 not_null: true,
1423 default: None,
1424 compress: None,
1425 unique: false,
1426 primary_key: false,
1427 enum_variants: Vec::new(),
1428 array_element: None,
1429 decimal_precision: None,
1430 });
1431 }
1432 Ok(crate::physical::CollectionContract {
1433 name: query.name.clone(),
1434 declared_model: crate::catalog::CollectionModel::Table,
1435 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1436 origin: crate::physical::ContractOrigin::Explicit,
1437 version: 1,
1438 created_at_unix_ms: now,
1439 updated_at_unix_ms: now,
1440 default_ttl_ms: query.default_ttl_ms,
1441 vector_dimension: None,
1442 vector_metric: None,
1443 context_index_fields: query.context_index_fields.clone(),
1444 declared_columns,
1445 table_def: Some(build_table_def_from_create_table(query)?),
1446 timestamps_enabled: query.timestamps,
1447 context_index_enabled: query.context_index_enabled
1448 || !query.context_index_fields.is_empty(),
1449 metrics_raw_retention_ms: None,
1450 metrics_rollup_policies: Vec::new(),
1451 metrics_tenant_identity: None,
1452 metrics_namespace: None,
1453 append_only: query.append_only,
1454 subscriptions: query.subscriptions.clone(),
1455 })
1456}
1457
1458fn default_collection_contract_for_existing_table(
1459 name: &str,
1460) -> crate::physical::CollectionContract {
1461 let now = current_unix_ms();
1462 crate::physical::CollectionContract {
1463 name: name.to_string(),
1464 declared_model: crate::catalog::CollectionModel::Table,
1465 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1466 origin: crate::physical::ContractOrigin::Explicit,
1467 version: 0,
1468 created_at_unix_ms: now,
1469 updated_at_unix_ms: now,
1470 default_ttl_ms: None,
1471 vector_dimension: None,
1472 vector_metric: None,
1473 context_index_fields: Vec::new(),
1474 declared_columns: Vec::new(),
1475 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1476 timestamps_enabled: false,
1477 context_index_enabled: false,
1478 metrics_raw_retention_ms: None,
1479 metrics_rollup_policies: Vec::new(),
1480 metrics_tenant_identity: None,
1481 metrics_namespace: None,
1482 append_only: false,
1483 subscriptions: Vec::new(),
1484 }
1485}
1486
1487fn keyed_collection_contract(
1488 name: &str,
1489 model: crate::catalog::CollectionModel,
1490) -> crate::physical::CollectionContract {
1491 let now = current_unix_ms();
1492 crate::physical::CollectionContract {
1493 name: name.to_string(),
1494 declared_model: model,
1495 schema_mode: crate::catalog::SchemaMode::Dynamic,
1496 origin: crate::physical::ContractOrigin::Explicit,
1497 version: 1,
1498 created_at_unix_ms: now,
1499 updated_at_unix_ms: now,
1500 default_ttl_ms: None,
1501 vector_dimension: None,
1502 vector_metric: None,
1503 context_index_fields: Vec::new(),
1504 declared_columns: Vec::new(),
1505 table_def: None,
1506 timestamps_enabled: false,
1507 context_index_enabled: false,
1508 metrics_raw_retention_ms: None,
1509 metrics_rollup_policies: Vec::new(),
1510 metrics_tenant_identity: None,
1511 metrics_namespace: None,
1512 append_only: false,
1513 subscriptions: Vec::new(),
1514 }
1515}
1516
1517fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1518 let now = current_unix_ms();
1519 crate::physical::CollectionContract {
1520 name: query.name.clone(),
1521 declared_model: crate::catalog::CollectionModel::Metrics,
1522 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1523 origin: crate::physical::ContractOrigin::Explicit,
1524 version: 1,
1525 created_at_unix_ms: now,
1526 updated_at_unix_ms: now,
1527 default_ttl_ms: query.default_ttl_ms,
1528 vector_dimension: None,
1529 vector_metric: None,
1530 context_index_fields: Vec::new(),
1531 declared_columns: Vec::new(),
1532 table_def: None,
1533 timestamps_enabled: false,
1534 context_index_enabled: false,
1535 metrics_raw_retention_ms: query.default_ttl_ms,
1536 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1537 metrics_tenant_identity: Some(
1538 query
1539 .tenant_by
1540 .clone()
1541 .unwrap_or_else(|| "current_tenant".to_string()),
1542 ),
1543 metrics_namespace: Some("default".to_string()),
1544 append_only: true,
1545 subscriptions: Vec::new(),
1546 }
1547}
1548
1549fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1550 let now = current_unix_ms();
1551 crate::physical::CollectionContract {
1552 name: query.name.clone(),
1553 declared_model: crate::catalog::CollectionModel::Vector,
1554 schema_mode: crate::catalog::SchemaMode::Dynamic,
1555 origin: crate::physical::ContractOrigin::Explicit,
1556 version: 1,
1557 created_at_unix_ms: now,
1558 updated_at_unix_ms: now,
1559 default_ttl_ms: None,
1560 vector_dimension: Some(query.dimension),
1561 vector_metric: Some(query.metric),
1562 context_index_fields: Vec::new(),
1563 declared_columns: Vec::new(),
1564 table_def: None,
1565 timestamps_enabled: false,
1566 context_index_enabled: false,
1567 metrics_raw_retention_ms: None,
1568 metrics_rollup_policies: Vec::new(),
1569 metrics_tenant_identity: None,
1570 metrics_namespace: None,
1571 append_only: false,
1572 subscriptions: Vec::new(),
1573 }
1574}
1575
1576fn declared_column_contract_from_ddl(
1577 column: &CreateColumnDef,
1578) -> crate::physical::DeclaredColumnContract {
1579 crate::physical::DeclaredColumnContract {
1580 name: column.name.clone(),
1581 data_type: column.data_type.clone(),
1582 sql_type: Some(column.sql_type.clone()),
1583 not_null: column.not_null,
1584 default: column.default.clone(),
1585 compress: column.compress,
1586 unique: column.unique,
1587 primary_key: column.primary_key,
1588 enum_variants: column.enum_variants.clone(),
1589 array_element: column.array_element.clone(),
1590 decimal_precision: column.decimal_precision,
1591 }
1592}
1593
1594fn apply_alter_operations_to_contract(
1595 contract: &mut crate::physical::CollectionContract,
1596 operations: &[AlterOperation],
1597) {
1598 if contract.table_def.is_none() {
1599 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1600 }
1601 for operation in operations {
1602 match operation {
1603 AlterOperation::AddColumn(column) => {
1604 if !contract
1605 .declared_columns
1606 .iter()
1607 .any(|existing| existing.name == column.name)
1608 {
1609 contract
1610 .declared_columns
1611 .push(declared_column_contract_from_ddl(column));
1612 }
1613 if let Some(table_def) = contract.table_def.as_mut() {
1614 if table_def.get_column(&column.name).is_none() {
1615 if let Ok(column_def) = column_def_from_ddl(column) {
1616 if column.primary_key {
1617 table_def.primary_key.push(column.name.clone());
1618 table_def.constraints.push(
1619 crate::storage::schema::Constraint::new(
1620 format!("pk_{}", column.name),
1621 crate::storage::schema::ConstraintType::PrimaryKey,
1622 )
1623 .on_columns(vec![column.name.clone()]),
1624 );
1625 }
1626 if column.unique {
1627 table_def.constraints.push(
1628 crate::storage::schema::Constraint::new(
1629 format!("uniq_{}", column.name),
1630 crate::storage::schema::ConstraintType::Unique,
1631 )
1632 .on_columns(vec![column.name.clone()]),
1633 );
1634 }
1635 if column.not_null {
1636 table_def.constraints.push(
1637 crate::storage::schema::Constraint::new(
1638 format!("not_null_{}", column.name),
1639 crate::storage::schema::ConstraintType::NotNull,
1640 )
1641 .on_columns(vec![column.name.clone()]),
1642 );
1643 }
1644 table_def.columns.push(column_def);
1645 }
1646 }
1647 }
1648 }
1649 AlterOperation::DropColumn(name) => {
1650 contract
1651 .declared_columns
1652 .retain(|column| column.name != *name);
1653 if let Some(table_def) = contract.table_def.as_mut() {
1654 if let Some(index) = table_def.column_index(name) {
1655 table_def.columns.remove(index);
1656 }
1657 table_def.primary_key.retain(|column| column != name);
1658 table_def.constraints.retain(|constraint| {
1659 !constraint.columns.iter().any(|column| column == name)
1660 });
1661 table_def
1662 .indexes
1663 .retain(|index| !index.columns.iter().any(|column| column == name));
1664 }
1665 }
1666 AlterOperation::RenameColumn { from, to } => {
1667 if contract
1668 .declared_columns
1669 .iter()
1670 .any(|column| column.name == *to)
1671 {
1672 continue;
1673 }
1674 if let Some(column) = contract
1675 .declared_columns
1676 .iter_mut()
1677 .find(|column| column.name == *from)
1678 {
1679 column.name = to.clone();
1680 }
1681 if let Some(table_def) = contract.table_def.as_mut() {
1682 if let Some(column) = table_def
1683 .columns
1684 .iter_mut()
1685 .find(|column| column.name == *from)
1686 {
1687 column.name = to.clone();
1688 }
1689 for primary_key in &mut table_def.primary_key {
1690 if *primary_key == *from {
1691 *primary_key = to.clone();
1692 }
1693 }
1694 for constraint in &mut table_def.constraints {
1695 for column in &mut constraint.columns {
1696 if *column == *from {
1697 *column = to.clone();
1698 }
1699 }
1700 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1701 for column in ref_columns {
1702 if *column == *from {
1703 *column = to.clone();
1704 }
1705 }
1706 }
1707 }
1708 for index in &mut table_def.indexes {
1709 for column in &mut index.columns {
1710 if *column == *from {
1711 *column = to.clone();
1712 }
1713 }
1714 }
1715 }
1716 }
1717 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1720 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1724 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1727 AlterOperation::SetAppendOnly(on) => {
1728 contract.append_only = *on;
1729 }
1730 AlterOperation::SetVersioned(_) => {}
1733 AlterOperation::EnableEvents(subscription) => {
1734 let mut subscription = subscription.clone();
1735 subscription.source = contract.name.clone();
1736 subscription.enabled = true;
1737 if let Some(existing) = contract
1738 .subscriptions
1739 .iter_mut()
1740 .find(|existing| existing.target_queue == subscription.target_queue)
1741 {
1742 *existing = subscription;
1743 } else {
1744 contract.subscriptions.push(subscription);
1745 }
1746 }
1747 AlterOperation::DisableEvents => {
1748 for subscription in &mut contract.subscriptions {
1749 subscription.enabled = false;
1750 }
1751 }
1752 AlterOperation::AddSubscription { name, descriptor } => {
1753 let mut sub = descriptor.clone();
1754 sub.name = name.clone();
1755 sub.source = contract.name.clone();
1756 sub.enabled = true;
1757 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1758 {
1759 *existing = sub;
1760 } else {
1761 contract.subscriptions.push(sub);
1762 }
1763 }
1764 AlterOperation::DropSubscription { name } => {
1765 contract.subscriptions.retain(|s| s.name != *name);
1766 }
1767 }
1768 }
1769}
1770
1771fn validate_event_subscriptions(
1772 runtime: &RedDBRuntime,
1773 source: &str,
1774 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1775) -> RedDBResult<()> {
1776 for subscription in subscriptions
1777 .iter()
1778 .filter(|subscription| subscription.enabled)
1779 {
1780 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1781 return Err(RedDBError::Query(
1782 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1783 ));
1784 }
1785 validate_subscription_auth(runtime, source, subscription)?;
1786 if subscription.target_queue == source
1787 || subscription_would_create_cycle(
1788 &runtime.inner.db,
1789 source,
1790 &subscription.target_queue,
1791 )
1792 {
1793 return Err(RedDBError::Query(
1794 "subscription would create cycle".to_string(),
1795 ));
1796 }
1797 audit_subscription_redact_gap(runtime, source, subscription);
1798 }
1799 Ok(())
1800}
1801
1802fn validate_subscription_auth(
1803 runtime: &RedDBRuntime,
1804 source: &str,
1805 subscription: &crate::catalog::SubscriptionDescriptor,
1806) -> RedDBResult<()> {
1807 let auth_store = match runtime.inner.auth_store.read().clone() {
1808 Some(store) => store,
1809 None => return Ok(()),
1810 };
1811 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1812 Some(identity) => identity,
1813 None => return Ok(()),
1814 };
1815 let tenant = crate::runtime::impl_core::current_tenant();
1816 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1817
1818 if auth_store.iam_authorization_enabled() {
1819 let ctx = crate::auth::policies::EvalContext {
1820 principal_tenant: tenant.clone(),
1821 current_tenant: tenant.clone(),
1822 peer_ip: None,
1823 mfa_present: false,
1824 now_ms: crate::auth::now_ms(),
1825 principal_is_admin_role: role == crate::auth::Role::Admin,
1826 };
1827 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1828 if let Some(t) = tenant.as_deref() {
1829 source_resource = source_resource.with_tenant(t.to_string());
1830 }
1831 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1832 return Err(RedDBError::Query(format!(
1833 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1834 principal, source_resource.kind, source_resource.name
1835 )));
1836 }
1837
1838 let mut target_resource =
1839 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1840 if let Some(t) = tenant.as_deref() {
1841 target_resource = target_resource.with_tenant(t.to_string());
1842 }
1843 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1844 return Err(RedDBError::Query(format!(
1845 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1846 principal, target_resource.kind, target_resource.name
1847 )));
1848 }
1849 return Ok(());
1850 }
1851
1852 let ctx = crate::auth::privileges::AuthzContext {
1853 principal: &username,
1854 effective_role: role,
1855 tenant: tenant.as_deref(),
1856 };
1857 auth_store
1858 .check_grant(
1859 &ctx,
1860 crate::auth::privileges::Action::Select,
1861 &crate::auth::privileges::Resource::table_from_name(source),
1862 )
1863 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1864 auth_store
1865 .check_grant(
1866 &ctx,
1867 crate::auth::privileges::Action::Insert,
1868 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1869 )
1870 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1871 Ok(())
1872}
1873
1874fn audit_subscription_redact_gap(
1875 runtime: &RedDBRuntime,
1876 source: &str,
1877 subscription: &crate::catalog::SubscriptionDescriptor,
1878) {
1879 let auth_store = match runtime.inner.auth_store.read().clone() {
1880 Some(store) if store.iam_authorization_enabled() => store,
1881 _ => return,
1882 };
1883 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1884 Some(identity) => identity,
1885 None => return,
1886 };
1887 let tenant = crate::runtime::impl_core::current_tenant();
1888 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1889 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1890 if missing.is_empty() {
1891 return;
1892 }
1893
1894 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1895 tracing::warn!(
1896 target: "reddb::operator",
1897 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1898 source,
1899 subscription.target_queue,
1900 columns
1901 );
1902 let mut event = AuditEvent::builder("subscription_redact_gap")
1903 .principal(username)
1904 .source(AuditAuthSource::System)
1905 .resource(format!(
1906 "subscription:{}->{}",
1907 source, subscription.target_queue
1908 ))
1909 .outcome(Outcome::Success)
1910 .field(AuditFieldEscaper::field("source", source))
1911 .field(AuditFieldEscaper::field(
1912 "target_queue",
1913 subscription.target_queue.clone(),
1914 ))
1915 .field(AuditFieldEscaper::field(
1916 "subscription",
1917 subscription.name.clone(),
1918 ))
1919 .field(AuditFieldEscaper::field("columns", columns))
1920 .field(AuditFieldEscaper::field("role", role.as_str()));
1921 if let Some(t) = tenant {
1922 event = event.tenant(t);
1923 }
1924 runtime.inner.audit_log.record_event(event.build());
1925}
1926
1927fn subscription_redact_gap_columns(
1928 auth_store: &crate::auth::store::AuthStore,
1929 principal: &crate::auth::UserId,
1930 source: &str,
1931 subscription: &crate::catalog::SubscriptionDescriptor,
1932) -> BTreeSet<String> {
1933 let redacted: HashSet<String> = subscription
1934 .redact_fields
1935 .iter()
1936 .map(|field| field.to_ascii_lowercase())
1937 .collect();
1938 auth_store
1939 .effective_policies(principal)
1940 .iter()
1941 .flat_map(|policy| policy.statements.iter())
1942 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1943 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1944 .flat_map(|statement| statement.resources.iter())
1945 .filter_map(|resource| denied_column_for_source(resource, source))
1946 .filter(|column| !redact_covers_column(&redacted, source, column))
1947 .collect()
1948}
1949
1950fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1951 match pattern {
1952 crate::auth::policies::ActionPattern::Wildcard => true,
1953 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1954 crate::auth::policies::ActionPattern::Prefix(prefix) => {
1955 "select".len() > prefix.len() + 1
1956 && "select".starts_with(prefix)
1957 && "select".as_bytes()[prefix.len()] == b':'
1958 }
1959 }
1960}
1961
1962fn denied_column_for_source(
1963 resource: &crate::auth::policies::ResourcePattern,
1964 source: &str,
1965) -> Option<String> {
1966 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1967 return None;
1968 };
1969 if kind != "column" {
1970 return None;
1971 }
1972 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1973 (column.table_resource_name() == source).then_some(column.column)
1974}
1975
1976fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1977 let column = column.to_ascii_lowercase();
1978 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1979 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1980}
1981
1982fn subscription_would_create_cycle(
1983 db: &crate::storage::unified::devx::RedDB,
1984 source: &str,
1985 target: &str,
1986) -> bool {
1987 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1988 for contract in db.collection_contracts() {
1989 for subscription in contract
1990 .subscriptions
1991 .into_iter()
1992 .filter(|subscription| subscription.enabled)
1993 {
1994 graph
1995 .entry(subscription.source)
1996 .or_default()
1997 .push(subscription.target_queue);
1998 }
1999 }
2000 graph
2001 .entry(source.to_string())
2002 .or_default()
2003 .push(target.to_string());
2004
2005 let mut stack = vec![target.to_string()];
2006 let mut seen = HashSet::new();
2007 while let Some(node) = stack.pop() {
2008 if node == source {
2009 return true;
2010 }
2011 if !seen.insert(node.clone()) {
2012 continue;
2013 }
2014 if let Some(next) = graph.get(&node) {
2015 stack.extend(next.iter().cloned());
2016 }
2017 }
2018 false
2019}
2020
2021pub(crate) fn ensure_event_target_queue_pub(
2022 runtime: &RedDBRuntime,
2023 queue: &str,
2024) -> RedDBResult<()> {
2025 ensure_event_target_queue(runtime, queue)
2026}
2027
2028fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2029 let store = runtime.inner.db.store();
2030 if store.get_collection(queue).is_some() {
2031 return Ok(());
2032 }
2033 store
2034 .create_collection(queue)
2035 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2036 runtime
2037 .inner
2038 .db
2039 .save_collection_contract(event_queue_collection_contract(queue))
2040 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2041 store.set_config_tree(
2042 &format!("queue.{queue}.mode"),
2043 &crate::serde_json::Value::String("fanout".to_string()),
2044 );
2045 Ok(())
2046}
2047
2048fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2049 let now = current_unix_ms();
2050 crate::physical::CollectionContract {
2051 name: queue.to_string(),
2052 declared_model: crate::catalog::CollectionModel::Queue,
2053 schema_mode: crate::catalog::SchemaMode::Dynamic,
2054 origin: crate::physical::ContractOrigin::Implicit,
2055 version: 1,
2056 created_at_unix_ms: now,
2057 updated_at_unix_ms: now,
2058 default_ttl_ms: None,
2059 vector_dimension: None,
2060 vector_metric: None,
2061 context_index_fields: Vec::new(),
2062 declared_columns: Vec::new(),
2063 table_def: None,
2064 timestamps_enabled: false,
2065 context_index_enabled: false,
2066 metrics_raw_retention_ms: None,
2067 metrics_rollup_policies: Vec::new(),
2068 metrics_tenant_identity: None,
2069 metrics_namespace: None,
2070 append_only: true,
2071 subscriptions: Vec::new(),
2072 }
2073}
2074
2075fn build_table_def_from_create_table(
2076 query: &CreateTableQuery,
2077) -> RedDBResult<crate::storage::schema::TableDef> {
2078 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2079 for column in &query.columns {
2080 if column.primary_key {
2081 table.primary_key.push(column.name.clone());
2082 table.constraints.push(
2083 crate::storage::schema::Constraint::new(
2084 format!("pk_{}", column.name),
2085 crate::storage::schema::ConstraintType::PrimaryKey,
2086 )
2087 .on_columns(vec![column.name.clone()]),
2088 );
2089 }
2090 if column.unique {
2091 table.constraints.push(
2092 crate::storage::schema::Constraint::new(
2093 format!("uniq_{}", column.name),
2094 crate::storage::schema::ConstraintType::Unique,
2095 )
2096 .on_columns(vec![column.name.clone()]),
2097 );
2098 }
2099 if column.not_null {
2100 table.constraints.push(
2101 crate::storage::schema::Constraint::new(
2102 format!("not_null_{}", column.name),
2103 crate::storage::schema::ConstraintType::NotNull,
2104 )
2105 .on_columns(vec![column.name.clone()]),
2106 );
2107 }
2108 table.columns.push(column_def_from_ddl(column)?);
2109 }
2110 if query.timestamps {
2115 table.columns.push(
2116 crate::storage::schema::ColumnDef::new(
2117 "created_at".to_string(),
2118 crate::storage::schema::DataType::UnsignedInteger,
2119 )
2120 .not_null(),
2121 );
2122 table.columns.push(
2123 crate::storage::schema::ColumnDef::new(
2124 "updated_at".to_string(),
2125 crate::storage::schema::DataType::UnsignedInteger,
2126 )
2127 .not_null(),
2128 );
2129 table.constraints.push(
2130 crate::storage::schema::Constraint::new(
2131 "not_null_created_at".to_string(),
2132 crate::storage::schema::ConstraintType::NotNull,
2133 )
2134 .on_columns(vec!["created_at".to_string()]),
2135 );
2136 table.constraints.push(
2137 crate::storage::schema::Constraint::new(
2138 "not_null_updated_at".to_string(),
2139 crate::storage::schema::ConstraintType::NotNull,
2140 )
2141 .on_columns(vec!["updated_at".to_string()]),
2142 );
2143 }
2144 table
2145 .validate()
2146 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2147 Ok(table)
2148}
2149
2150fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2151 let data_type = resolve_declared_data_type(&column.data_type)
2152 .map_err(|err| RedDBError::Query(err.to_string()))?;
2153 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2154 if column.not_null {
2155 column_def = column_def.not_null();
2156 }
2157 if let Some(default) = &column.default {
2158 column_def = column_def.with_default(default.as_bytes().to_vec());
2159 }
2160 if column.compress.unwrap_or(0) > 0 {
2161 column_def = column_def.compressed();
2162 }
2163 if !column.enum_variants.is_empty() {
2164 column_def = column_def.with_variants(column.enum_variants.clone());
2165 }
2166 if let Some(precision) = column.decimal_precision {
2167 column_def = column_def.with_precision(precision);
2168 }
2169 if let Some(element_type) = &column.array_element {
2170 column_def = column_def.with_element_type(
2171 resolve_declared_data_type(element_type)
2172 .map_err(|err| RedDBError::Query(err.to_string()))?,
2173 );
2174 }
2175 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2176 if column.unique {
2177 column_def = column_def.with_metadata("unique", "true");
2178 }
2179 if column.primary_key {
2180 column_def = column_def.with_metadata("primary_key", "true");
2181 }
2182 Ok(column_def)
2183}
2184
2185fn current_unix_ms() -> u128 {
2186 std::time::SystemTime::now()
2187 .duration_since(std::time::UNIX_EPOCH)
2188 .unwrap_or_default()
2189 .as_millis()
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2195 use crate::auth::store::{AuthStore, PrincipalRef};
2196 use crate::auth::UserId;
2197 use crate::auth::{AuthConfig, Role};
2198 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2199 use crate::storage::schema::Value;
2200 use crate::{RedDBOptions, RedDBRuntime};
2201 use std::sync::Arc;
2202
2203 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2204 Policy {
2205 id: id.to_string(),
2206 version: 1,
2207 tenant: None,
2208 created_at: 0,
2209 updated_at: 0,
2210 statements: vec![Statement {
2211 sid: None,
2212 effect: Effect::Allow,
2213 actions: vec![ActionPattern::Exact(action.to_string())],
2214 resources: vec![ResourcePattern::Exact {
2215 kind: "collection".to_string(),
2216 name: collection.to_string(),
2217 }],
2218 condition: None,
2219 }],
2220 }
2221 }
2222
2223 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2224 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2225 *rt.inner.auth_store.write() = Some(store.clone());
2226 store
2227 }
2228
2229 #[test]
2230 fn drop_denied_without_iam_policy() {
2231 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2232 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2233 let store = wire_auth_store(&rt);
2234 let select_only = Policy {
2236 id: "select-only".to_string(),
2237 version: 1,
2238 tenant: None,
2239 created_at: 0,
2240 updated_at: 0,
2241 statements: vec![Statement {
2242 sid: None,
2243 effect: Effect::Allow,
2244 actions: vec![ActionPattern::Exact("select".to_string())],
2245 resources: vec![ResourcePattern::Wildcard],
2246 condition: None,
2247 }],
2248 };
2249 store.put_policy_internal(select_only).unwrap();
2250 let alice = UserId::from_parts(None, "alice");
2251 store
2252 .attach_policy(PrincipalRef::User(alice), "select-only")
2253 .unwrap();
2254 set_current_auth_identity("alice".to_string(), Role::Write);
2255 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2256 clear_current_auth_identity();
2257 assert!(
2258 format!("{err}").contains("denied by IAM policy"),
2259 "got: {err}"
2260 );
2261 }
2262
2263 #[test]
2264 fn drop_allowed_with_explicit_iam_policy() {
2265 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2266 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2267 let store = wire_auth_store(&rt);
2268 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2269 store.put_policy_internal(policy).unwrap();
2270 let bob = UserId::from_parts(None, "bob");
2271 store
2272 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2273 .unwrap();
2274 set_current_auth_identity("bob".to_string(), Role::Write);
2275 rt.execute_query("DROP TABLE bar").unwrap();
2276 clear_current_auth_identity();
2277 }
2278
2279 #[test]
2280 fn drop_allowed_with_wildcard_iam_policy() {
2281 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2282 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2283 let store = wire_auth_store(&rt);
2284 let policy = Policy {
2285 id: "allow-drop-all".to_string(),
2286 version: 1,
2287 tenant: None,
2288 created_at: 0,
2289 updated_at: 0,
2290 statements: vec![Statement {
2291 sid: None,
2292 effect: Effect::Allow,
2293 actions: vec![ActionPattern::Exact("drop".to_string())],
2294 resources: vec![ResourcePattern::Wildcard],
2295 condition: None,
2296 }],
2297 };
2298 store.put_policy_internal(policy).unwrap();
2299 let carl = UserId::from_parts(None, "carl");
2300 store
2301 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2302 .unwrap();
2303 set_current_auth_identity("carl".to_string(), Role::Write);
2304 rt.execute_query("DROP TABLE baz").unwrap();
2305 clear_current_auth_identity();
2306 }
2307
2308 #[test]
2309 fn truncate_denied_without_iam_policy() {
2310 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2311 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2312 let store = wire_auth_store(&rt);
2313 let select_only = Policy {
2315 id: "select-only-2".to_string(),
2316 version: 1,
2317 tenant: None,
2318 created_at: 0,
2319 updated_at: 0,
2320 statements: vec![Statement {
2321 sid: None,
2322 effect: Effect::Allow,
2323 actions: vec![ActionPattern::Exact("select".to_string())],
2324 resources: vec![ResourcePattern::Wildcard],
2325 condition: None,
2326 }],
2327 };
2328 store.put_policy_internal(select_only).unwrap();
2329 let dana = UserId::from_parts(None, "dana");
2330 store
2331 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2332 .unwrap();
2333 set_current_auth_identity("dana".to_string(), Role::Write);
2334 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2335 clear_current_auth_identity();
2336 assert!(
2337 format!("{err}").contains("denied by IAM policy"),
2338 "got: {err}"
2339 );
2340 }
2341
2342 #[test]
2343 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2344 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2345 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2346 .unwrap();
2347 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2348 .unwrap();
2349 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2350 .unwrap();
2351
2352 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2353 assert_eq!(truncated.statement_type, "truncate");
2354 assert_eq!(truncated.affected_rows, 0);
2355
2356 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2357 assert!(empty.result.records.is_empty());
2358
2359 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2360 .unwrap();
2361 let selected = rt
2362 .execute_query("SELECT name FROM users WHERE id = 3")
2363 .unwrap();
2364 let name = selected.result.records[0].get("name").unwrap();
2365 assert_eq!(name, &Value::text("cy"));
2366 assert!(rt.db().collection_contract("users").is_some());
2367 assert!(rt
2368 .inner
2369 .index_store
2370 .list_indices("users")
2371 .iter()
2372 .any(|index| index.name == "idx_users_id"));
2373 }
2374
2375 #[test]
2376 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2377 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2378 rt.execute_query("CREATE QUEUE tasks").unwrap();
2379 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2380
2381 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2382 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2383
2384 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2385 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2386 assert_eq!(
2387 len.result.records[0].get("len"),
2388 Some(&Value::UnsignedInteger(0))
2389 );
2390 }
2391
2392 #[test]
2393 fn truncate_system_schema_is_read_only() {
2394 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2395 let err = rt
2396 .execute_query("TRUNCATE COLLECTION red.collections")
2397 .unwrap_err();
2398 assert!(format!("{err}").contains("system schema is read-only"));
2399 }
2400
2401 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2404 let result = rt
2405 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2406 .expect("peek queue");
2407 result
2408 .result
2409 .records
2410 .iter()
2411 .map(
2412 |record| match record.get("payload").expect("payload column") {
2413 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2414 other => panic!("expected JSON queue payload, got {other:?}"),
2415 },
2416 )
2417 .collect()
2418 }
2419
2420 #[test]
2423 fn truncate_event_enabled_table_emits_single_truncate_event() {
2424 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2425 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2426 .unwrap();
2427 rt.execute_query(
2428 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2429 )
2430 .unwrap();
2431
2432 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2434
2435 rt.execute_query("TRUNCATE TABLE users").unwrap();
2436
2437 let events = queue_payloads(&rt, "users_events");
2438 assert_eq!(
2440 events.len(),
2441 1,
2442 "expected 1 truncate event, got {}",
2443 events.len()
2444 );
2445 let ev = events[0].as_object().expect("event is object");
2446 assert_eq!(
2447 ev.get("op").and_then(crate::json::Value::as_str),
2448 Some("truncate")
2449 );
2450 assert_eq!(
2451 ev.get("collection").and_then(crate::json::Value::as_str),
2452 Some("users")
2453 );
2454 assert_eq!(
2455 ev.get("entities_count")
2456 .and_then(crate::json::Value::as_u64),
2457 Some(3)
2458 );
2459 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2460 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2461 assert!(ev
2462 .get("event_id")
2463 .and_then(crate::json::Value::as_str)
2464 .is_some_and(|s| !s.is_empty()));
2465 }
2466
2467 #[test]
2469 fn truncate_no_events_collection_emits_nothing() {
2470 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2471 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2472 .unwrap();
2473 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2474 .unwrap();
2475 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2477 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2479 assert!(rows.result.records.is_empty());
2480 }
2481
2482 #[test]
2486 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2487 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2488 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2489 .unwrap();
2490 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2491 .unwrap();
2492
2493 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2495
2496 rt.execute_query("DROP TABLE users").unwrap();
2497
2498 let events = queue_payloads(&rt, "users_events");
2500 assert_eq!(
2501 events.len(),
2502 1,
2503 "expected 1 collection_dropped event, got {}",
2504 events.len()
2505 );
2506 let ev = events[0].as_object().expect("event is object");
2507 assert_eq!(
2508 ev.get("op").and_then(crate::json::Value::as_str),
2509 Some("collection_dropped")
2510 );
2511 assert_eq!(
2512 ev.get("collection").and_then(crate::json::Value::as_str),
2513 Some("users")
2514 );
2515 assert_eq!(
2516 ev.get("final_entities_count")
2517 .and_then(crate::json::Value::as_u64),
2518 Some(2)
2519 );
2520 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2521 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2522 assert!(ev
2523 .get("event_id")
2524 .and_then(crate::json::Value::as_str)
2525 .is_some_and(|s| !s.is_empty()));
2526
2527 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2529 assert!(
2530 format!("{err}").contains("users"),
2531 "expected not-found error"
2532 );
2533 }
2534
2535 #[test]
2538 fn drop_no_events_collection_emits_nothing() {
2539 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2540 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2541 .unwrap();
2542 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2543 .unwrap();
2544 rt.execute_query("DROP TABLE plain").unwrap();
2545 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2547 assert!(format!("{err}").contains("plain"));
2548 }
2549
2550 #[test]
2554 fn ops_filter_insert_only_ignores_update_and_delete() {
2555 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2556 rt.execute_query(
2557 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2558 )
2559 .unwrap();
2560 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2561 .unwrap();
2562 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2563 .unwrap();
2564 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2565
2566 let events = queue_payloads(&rt, "items_events");
2567 assert_eq!(
2569 events.len(),
2570 1,
2571 "expected 1 insert event, got {}",
2572 events.len()
2573 );
2574 assert_eq!(
2575 events[0]
2576 .as_object()
2577 .unwrap()
2578 .get("op")
2579 .and_then(crate::json::Value::as_str),
2580 Some("insert")
2581 );
2582 }
2583
2584 #[test]
2586 fn where_filter_skips_rows_that_do_not_match() {
2587 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2588 rt.execute_query(
2589 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2590 )
2591 .unwrap();
2592
2593 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2595 .unwrap();
2596 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2598 .unwrap();
2599
2600 let events = queue_payloads(&rt, "users_events");
2601 assert_eq!(
2602 events.len(),
2603 1,
2604 "expected 1 event (only active), got {}",
2605 events.len()
2606 );
2607 let ev = events[0].as_object().unwrap();
2608 assert_eq!(
2609 ev.get("op").and_then(crate::json::Value::as_str),
2610 Some("insert")
2611 );
2612 let after = ev.get("after").unwrap().as_object().unwrap();
2613 assert_eq!(
2614 after.get("status").and_then(crate::json::Value::as_str),
2615 Some("active")
2616 );
2617 }
2618
2619 #[test]
2621 fn ops_filter_and_where_filter_combined() {
2622 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2623 rt.execute_query(
2624 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2625 )
2626 .unwrap();
2627
2628 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2630 .unwrap();
2631 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2633 .unwrap();
2634 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2636 .unwrap();
2637 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2639
2640 let events = queue_payloads(&rt, "items_events");
2641 assert_eq!(
2643 events.len(),
2644 1,
2645 "expected 1 event, got {}: {events:?}",
2646 events.len()
2647 );
2648 assert_eq!(
2649 events[0]
2650 .as_object()
2651 .unwrap()
2652 .get("op")
2653 .and_then(crate::json::Value::as_str),
2654 Some("insert")
2655 );
2656 }
2657
2658 #[test]
2660 fn where_filter_on_delete_checks_before_state() {
2661 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2662 rt.execute_query(
2663 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2664 )
2665 .unwrap();
2666
2667 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2668 .unwrap();
2669
2670 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2672 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2674
2675 let events = queue_payloads(&rt, "users_events");
2676 assert_eq!(
2677 events.len(),
2678 1,
2679 "expected 1 delete event, got {}",
2680 events.len()
2681 );
2682 let ev = events[0].as_object().unwrap();
2683 assert_eq!(
2684 ev.get("op").and_then(crate::json::Value::as_str),
2685 Some("delete")
2686 );
2687 }
2688
2689 #[test]
2693 fn alter_add_column_on_event_enabled_table_succeeds() {
2694 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2695 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2696 .unwrap();
2697 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2699 .unwrap();
2700 let contract = rt.db().collection_contract("users").unwrap();
2702 assert!(
2703 contract.declared_columns.iter().any(|c| c.name == "phone"),
2704 "phone column should be in contract"
2705 );
2706 assert!(
2708 contract.subscriptions.iter().any(|s| s.enabled),
2709 "subscription should remain enabled"
2710 );
2711 }
2712
2713 #[test]
2716 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2717 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2718 rt.execute_query(
2719 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2720 )
2721 .unwrap();
2722 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2724 .unwrap();
2725 let contract = rt.db().collection_contract("items").unwrap();
2726 assert!(
2727 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2728 "secret column should be removed"
2729 );
2730 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2732 .unwrap();
2733 assert!(
2735 contract.subscriptions.iter().any(|s| s.enabled),
2736 "subscription should remain enabled"
2737 );
2738 }
2739}