1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(
265 &query.name,
266 query.collection_model,
267 query.analytics_config.clone(),
268 )
269 };
270 self.inner
271 .db
272 .save_collection_contract(contract)
273 .map_err(|err| RedDBError::Internal(err.to_string()))?;
274 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275 store.set_config_tree(
276 &format!("red.collection_tenants.{}", query.name),
277 &crate::serde_json::Value::String(tenant_id),
278 );
279 }
280 self.inner
281 .db
282 .persist_metadata()
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 self.invalidate_result_cache();
285
286 Ok(RuntimeQueryResult::ok_message(
287 raw_query.to_string(),
288 &format!("{label} '{}' created", query.name),
289 "create",
290 ))
291 }
292
293 pub fn ensure_system_graph_with_analytics(
302 &self,
303 name: &str,
304 outputs: &[crate::catalog::AnalyticsOutput],
305 ) -> RedDBResult<()> {
306 let store = self.inner.db.store();
307 if store.get_collection(name).is_some() {
308 return Ok(());
309 }
310 let analytics_config = outputs
311 .iter()
312 .map(|output| crate::catalog::AnalyticsViewDescriptor {
313 output: *output,
314 algorithm: None,
315 resolution: None,
316 max_iterations: None,
317 tolerance: None,
318 })
319 .collect();
320 store
321 .create_collection(name)
322 .map_err(|err| RedDBError::Internal(err.to_string()))?;
323 let contract = keyed_collection_contract(
324 name,
325 crate::catalog::CollectionModel::Graph,
326 analytics_config,
327 );
328 self.inner
329 .db
330 .save_collection_contract(contract)
331 .map_err(|err| RedDBError::Internal(err.to_string()))?;
332 self.inner
333 .db
334 .persist_metadata()
335 .map_err(|err| RedDBError::Internal(err.to_string()))?;
336 self.invalidate_result_cache();
337 Ok(())
338 }
339
340 pub fn execute_create_collection(
341 &self,
342 raw_query: &str,
343 query: &CreateCollectionQuery,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 let model = match query.kind.as_str() {
346 "graph" => CollectionModel::Graph,
347 "document" => CollectionModel::Document,
348 "metrics" => CollectionModel::Metrics,
349 "vector.turbo" => {
350 let dimension = query.vector_dimension.ok_or_else(|| {
351 RedDBError::Query(
352 "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353 )
354 })?;
355 let create = CreateVectorQuery {
356 name: query.name.clone(),
357 dimension,
358 metric: query
359 .vector_metric
360 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361 if_not_exists: query.if_not_exists,
362 };
363 let result = self.execute_create_vector(raw_query, &create)?;
364 let store = self.inner.db.store();
371 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372 self.inner
373 .db
374 .persist_metadata()
375 .map_err(|err| RedDBError::Internal(err.to_string()))?;
376 let _ = self.inner.db.turbo_state(&query.name);
381 return Ok(result);
382 }
383 "blockchain" => CollectionModel::Table,
388 other => {
389 return Err(RedDBError::Query(format!(
390 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391 )));
392 }
393 };
394 let create = CreateTableQuery {
395 collection_model: model,
396 name: query.name.clone(),
397 columns: Vec::new(),
398 if_not_exists: query.if_not_exists,
399 default_ttl_ms: None,
400 metrics_rollup_policies: Vec::new(),
401 context_index_fields: Vec::new(),
402 context_index_enabled: false,
403 timestamps: false,
404 partition_by: None,
405 tenant_by: None,
406 append_only: false,
407 subscriptions: Vec::new(),
408 analytics_config: Vec::new(),
409 vault_own_master_key: false,
410 };
411 let result = self.execute_create_table(raw_query, &create)?;
412 if query.kind == "blockchain" {
413 self.install_blockchain_kind(&query.name)?;
414 }
415 if !query.allowed_signers.is_empty() {
420 let actor = crate::runtime::impl_core::current_user_projected()
421 .unwrap_or_else(|| "@system/create-collection".to_string());
422 crate::runtime::signed_writes_kind::install(
423 &self.inner.db.store(),
424 &query.name,
425 &query.allowed_signers,
426 &actor,
427 );
428 }
429 Ok(result)
430 }
431
432 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436 use crate::runtime::blockchain_kind;
437 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438 use std::sync::Arc;
439
440 let store = self.inner.db.store();
441 blockchain_kind::mark_as_chain(&store, name);
442
443 let existing_tip = blockchain_kind::chain_tip(&store, name);
444 if existing_tip.height.is_some() {
445 return Ok(());
446 }
447
448 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450 fields.into_iter().collect();
451 let entity = UnifiedEntity::new(
452 EntityId::new(0),
453 EntityKind::TableRow {
454 table: Arc::from(name),
455 row_id: 0,
456 },
457 EntityData::Row(RowData {
458 columns: Vec::new(),
459 named: Some(named),
460 schema: None,
461 }),
462 );
463 store
464 .insert_auto(name, entity)
465 .map_err(|err| RedDBError::Internal(err.to_string()))?;
466 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469 self.inner
470 .chain_tip_cache
471 .lock()
472 .insert(name.to_string(), tip);
473 }
474 Ok(())
475 }
476
477 pub fn execute_create_vector(
478 &self,
479 raw_query: &str,
480 query: &CreateVectorQuery,
481 ) -> RedDBResult<RuntimeQueryResult> {
482 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486 let store = self.inner.db.store();
487 if store.get_collection(&query.name).is_some() {
488 if query.if_not_exists {
489 return Ok(RuntimeQueryResult::ok_message(
490 raw_query.to_string(),
491 &format!("vector '{}' already exists", query.name),
492 "create",
493 ));
494 }
495 return Err(RedDBError::Query(format!(
496 "vector '{}' already exists",
497 query.name
498 )));
499 }
500
501 store
502 .create_collection(&query.name)
503 .map_err(|err| RedDBError::Internal(err.to_string()))?;
504 self.inner
505 .db
506 .save_collection_contract(vector_collection_contract(query))
507 .map_err(|err| RedDBError::Internal(err.to_string()))?;
508 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509 store.set_config_tree(
510 &format!("red.collection_tenants.{}", query.name),
511 &crate::serde_json::Value::String(tenant_id),
512 );
513 }
514 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523 self.inner
524 .db
525 .persist_metadata()
526 .map_err(|err| RedDBError::Internal(err.to_string()))?;
527 let _ = self.inner.db.turbo_state(&query.name);
532 self.invalidate_result_cache();
533
534 Ok(RuntimeQueryResult::ok_message(
535 raw_query.to_string(),
536 &format!("vector '{}' created", query.name),
537 "create",
538 ))
539 }
540
541 fn provision_vault_key_material(
542 &self,
543 collection: &str,
544 own_master_key: bool,
545 ) -> RedDBResult<()> {
546 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548 })?;
549 if !auth_store.is_vault_backed() {
550 return Err(RedDBError::Query(
551 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552 ));
553 }
554
555 if auth_store.vault_secret_key().is_none() {
556 let key = crate::auth::store::random_bytes(32);
557 auth_store
558 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559 .map_err(|err| RedDBError::Query(err.to_string()))?;
560 }
561
562 if own_master_key {
563 let key = crate::auth::store::random_bytes(32);
564 auth_store
565 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566 .map_err(|err| RedDBError::Query(err.to_string()))?;
567 }
568
569 Ok(())
570 }
571
572 pub fn execute_drop_table(
576 &self,
577 raw_query: &str,
578 query: &DropTableQuery,
579 ) -> RedDBResult<RuntimeQueryResult> {
580 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581 let store = self.inner.db.store();
582
583 if is_system_schema_name(&query.name) {
584 return Err(RedDBError::Query("system schema is read-only".to_string()));
585 }
586
587 let exists = store.get_collection(&query.name).is_some();
588 if !exists {
589 if query.if_exists {
590 return Ok(RuntimeQueryResult::ok_message(
591 raw_query.to_string(),
592 &format!("table '{}' does not exist", query.name),
593 "drop",
594 ));
595 }
596 return Err(RedDBError::NotFound(format!(
597 "table '{}' not found",
598 query.name
599 )));
600 }
601 let actual =
602 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605 let final_count = store
608 .get_collection(&query.name)
609 .map(|manager| manager.query_all(|_| true).len() as u64)
610 .unwrap_or(0);
611 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612 self,
613 &query.name,
614 final_count,
615 )?;
616
617 let orphaned_indices: Vec<String> = self
618 .inner
619 .index_store
620 .list_indices(&query.name)
621 .into_iter()
622 .map(|index| index.name)
623 .collect();
624 for name in &orphaned_indices {
625 self.inner.index_store.drop_index(name, &query.name);
626 }
627
628 store
629 .drop_collection(&query.name)
630 .map_err(|err| RedDBError::Internal(err.to_string()))?;
631 self.inner.db.invalidate_vector_index(&query.name);
632 self.inner.db.clear_collection_default_ttl_ms(&query.name);
633 self.inner
634 .db
635 .remove_collection_contract(&query.name)
636 .map_err(|err| RedDBError::Internal(err.to_string()))?;
637 self.clear_table_planner_stats(&query.name);
638 self.invalidate_result_cache();
639 if let Some(store) = self.inner.auth_store.read().clone() {
643 store.invalidate_visible_collections_cache();
644 }
645 self.inner
646 .db
647 .persist_metadata()
648 .map_err(|err| RedDBError::Internal(err.to_string()))?;
649 self.schema_vocabulary_apply(
655 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656 collection: query.name.clone(),
657 },
658 );
659
660 Ok(RuntimeQueryResult::ok_message(
661 raw_query.to_string(),
662 &format!("table '{}' dropped", query.name),
663 "drop",
664 ))
665 }
666
667 pub fn execute_drop_graph(
668 &self,
669 raw_query: &str,
670 query: &DropGraphQuery,
671 ) -> RedDBResult<RuntimeQueryResult> {
672 self.execute_drop_typed_collection(
673 raw_query,
674 &query.name,
675 query.if_exists,
676 CollectionModel::Graph,
677 "graph",
678 )
679 }
680
681 pub fn execute_drop_vector(
682 &self,
683 raw_query: &str,
684 query: &DropVectorQuery,
685 ) -> RedDBResult<RuntimeQueryResult> {
686 self.execute_drop_typed_collection(
687 raw_query,
688 &query.name,
689 query.if_exists,
690 CollectionModel::Vector,
691 "vector",
692 )
693 }
694
695 pub fn execute_drop_document(
696 &self,
697 raw_query: &str,
698 query: &DropDocumentQuery,
699 ) -> RedDBResult<RuntimeQueryResult> {
700 self.execute_drop_typed_collection(
701 raw_query,
702 &query.name,
703 query.if_exists,
704 CollectionModel::Document,
705 "document",
706 )
707 }
708
709 pub fn execute_drop_kv(
710 &self,
711 raw_query: &str,
712 query: &DropKvQuery,
713 ) -> RedDBResult<RuntimeQueryResult> {
714 let label = polymorphic_resolver::model_name(query.model);
715 self.execute_drop_typed_collection(
716 raw_query,
717 &query.name,
718 query.if_exists,
719 query.model,
720 label,
721 )
722 }
723
724 pub fn execute_drop_collection(
725 &self,
726 raw_query: &str,
727 query: &DropCollectionQuery,
728 ) -> RedDBResult<RuntimeQueryResult> {
729 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730 if is_system_schema_name(&query.name) {
731 return Err(RedDBError::Query("system schema is read-only".to_string()));
732 }
733 let store = self.inner.db.store();
734 if store.get_collection(&query.name).is_none() {
735 if query.if_exists {
736 return Ok(RuntimeQueryResult::ok_message(
737 raw_query.to_string(),
738 &format!("collection '{}' does not exist", query.name),
739 "drop",
740 ));
741 }
742 return Err(RedDBError::NotFound(format!(
743 "collection '{}' not found",
744 query.name
745 )));
746 }
747
748 let actual =
749 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750 if let Some(expected) = query.model {
751 polymorphic_resolver::ensure_model_match(expected, actual)?;
752 }
753
754 match actual {
755 CollectionModel::Table => self.execute_drop_table(
756 raw_query,
757 &DropTableQuery {
758 name: query.name.clone(),
759 if_exists: query.if_exists,
760 },
761 ),
762 CollectionModel::TimeSeries => self.execute_drop_timeseries(
763 raw_query,
764 &DropTimeSeriesQuery {
765 name: query.name.clone(),
766 if_exists: query.if_exists,
767 },
768 ),
769 CollectionModel::Queue => self.execute_drop_queue(
770 raw_query,
771 &DropQueueQuery {
772 name: query.name.clone(),
773 if_exists: query.if_exists,
774 },
775 ),
776 CollectionModel::Graph => self.execute_drop_graph(
777 raw_query,
778 &DropGraphQuery {
779 name: query.name.clone(),
780 if_exists: query.if_exists,
781 },
782 ),
783 CollectionModel::Vector => self.execute_drop_vector(
784 raw_query,
785 &DropVectorQuery {
786 name: query.name.clone(),
787 if_exists: query.if_exists,
788 },
789 ),
790 CollectionModel::Document => self.execute_drop_document(
791 raw_query,
792 &DropDocumentQuery {
793 name: query.name.clone(),
794 if_exists: query.if_exists,
795 },
796 ),
797 CollectionModel::Kv => self.execute_drop_kv(
798 raw_query,
799 &DropKvQuery {
800 name: query.name.clone(),
801 if_exists: query.if_exists,
802 model: CollectionModel::Kv,
803 },
804 ),
805 CollectionModel::Config => self.execute_drop_kv(
806 raw_query,
807 &DropKvQuery {
808 name: query.name.clone(),
809 if_exists: query.if_exists,
810 model: CollectionModel::Config,
811 },
812 ),
813 CollectionModel::Vault => self.execute_drop_kv(
814 raw_query,
815 &DropKvQuery {
816 name: query.name.clone(),
817 if_exists: query.if_exists,
818 model: CollectionModel::Vault,
819 },
820 ),
821 CollectionModel::Hll => self.execute_probabilistic_command(
822 raw_query,
823 &ProbabilisticCommand::DropHll {
824 name: query.name.clone(),
825 if_exists: query.if_exists,
826 },
827 ),
828 CollectionModel::Sketch => self.execute_probabilistic_command(
829 raw_query,
830 &ProbabilisticCommand::DropSketch {
831 name: query.name.clone(),
832 if_exists: query.if_exists,
833 },
834 ),
835 CollectionModel::Filter => self.execute_probabilistic_command(
836 raw_query,
837 &ProbabilisticCommand::DropFilter {
838 name: query.name.clone(),
839 if_exists: query.if_exists,
840 },
841 ),
842 CollectionModel::Metrics => self.execute_drop_typed_collection(
843 raw_query,
844 &query.name,
845 query.if_exists,
846 CollectionModel::Metrics,
847 "metrics",
848 ),
849 CollectionModel::Mixed => self.execute_drop_typed_collection(
850 raw_query,
851 &query.name,
852 query.if_exists,
853 CollectionModel::Mixed,
854 "collection",
855 ),
856 }
857 }
858
859 fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865 let is_graph = self
866 .inner
867 .db
868 .collection_contract(name)
869 .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870 .unwrap_or(false);
871 if !is_graph {
872 return Err(RedDBError::Query(format!(
873 "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874 )));
875 }
876 Ok(())
877 }
878
879 pub fn execute_alter_table(
885 &self,
886 raw_query: &str,
887 query: &AlterTableQuery,
888 ) -> RedDBResult<RuntimeQueryResult> {
889 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890 let store = self.inner.db.store();
891
892 if store.get_collection(&query.name).is_none() {
894 return Err(RedDBError::NotFound(format!(
895 "table '{}' not found",
896 query.name
897 )));
898 }
899
900 let mut messages = Vec::new();
901
902 let fields_added: Vec<String> = query
904 .operations
905 .iter()
906 .filter_map(|op| {
907 if let AlterOperation::AddColumn(col) = op {
908 Some(col.name.clone())
909 } else {
910 None
911 }
912 })
913 .collect();
914 let fields_removed: Vec<String> = query
915 .operations
916 .iter()
917 .filter_map(|op| {
918 if let AlterOperation::DropColumn(name) = op {
919 Some(name.clone())
920 } else {
921 None
922 }
923 })
924 .collect();
925
926 for op in &query.operations {
927 match op {
928 AlterOperation::AddColumn(col) => {
929 messages.push(format!("column '{}' added", col.name));
931 }
932 AlterOperation::DropColumn(name) => {
933 messages.push(format!("column '{}' dropped", name));
934 }
935 AlterOperation::RenameColumn { from, to } => {
936 messages.push(format!("column '{}' renamed to '{}'", from, to));
937 }
938 AlterOperation::AttachPartition { child, bound } => {
939 store.set_config_tree(
943 &format!("partition.{}.children.{}", query.name, child),
944 &crate::serde_json::Value::String(bound.clone()),
945 );
946 messages.push(format!(
947 "partition '{child}' attached to '{}' ({bound})",
948 query.name
949 ));
950 }
951 AlterOperation::DetachPartition { child } => {
952 store.set_config_tree(
953 &format!("partition.{}.children.{}", query.name, child),
954 &crate::serde_json::Value::Null,
955 );
956 messages.push(format!(
957 "partition '{child}' detached from '{}'",
958 query.name
959 ));
960 }
961 AlterOperation::EnableRowLevelSecurity => {
962 self.inner
963 .rls_enabled_tables
964 .write()
965 .insert(query.name.clone());
966 store.set_config_tree(
968 &format!("rls.enabled.{}", query.name),
969 &crate::serde_json::Value::Bool(true),
970 );
971 self.invalidate_plan_cache();
972 messages.push(format!("row level security enabled on '{}'", query.name));
973 }
974 AlterOperation::DisableRowLevelSecurity => {
975 self.inner.rls_enabled_tables.write().remove(&query.name);
976 store.set_config_tree(
977 &format!("rls.enabled.{}", query.name),
978 &crate::serde_json::Value::Null,
979 );
980 self.invalidate_plan_cache();
981 messages.push(format!("row level security disabled on '{}'", query.name));
982 }
983 AlterOperation::EnableTenancy { column } => {
985 store.set_config_tree(
986 &format!("tenant_tables.{}.column", query.name),
987 &crate::serde_json::Value::String(column.clone()),
988 );
989 self.register_tenant_table(&query.name, column);
990 self.invalidate_plan_cache();
991 messages.push(format!(
992 "tenancy enabled on '{}' by column '{column}'",
993 query.name
994 ));
995 }
996 AlterOperation::DisableTenancy => {
997 store.set_config_tree(
998 &format!("tenant_tables.{}.column", query.name),
999 &crate::serde_json::Value::Null,
1000 );
1001 self.unregister_tenant_table(&query.name);
1002 self.invalidate_plan_cache();
1003 messages.push(format!("tenancy disabled on '{}'", query.name));
1004 }
1005 AlterOperation::SetAppendOnly(on) => {
1006 messages.push(format!(
1011 "append_only {} on '{}'",
1012 if *on { "enabled" } else { "disabled" },
1013 query.name
1014 ));
1015 }
1016 AlterOperation::SetVersioned(on) => {
1017 self.vcs_set_versioned(&query.name, *on)?;
1024 messages.push(format!(
1025 "versioned {} on '{}'",
1026 if *on { "enabled" } else { "disabled" },
1027 query.name
1028 ));
1029 }
1030 AlterOperation::EnableEvents(subscription) => {
1031 let mut subscription = subscription.clone();
1032 subscription.source = query.name.clone();
1033 validate_event_subscriptions(
1034 self,
1035 &query.name,
1036 std::slice::from_ref(&subscription),
1037 )?;
1038 ensure_event_target_queue(self, &subscription.target_queue)?;
1039 messages.push(format!(
1040 "events enabled on '{}' to '{}'",
1041 query.name, subscription.target_queue
1042 ));
1043 }
1044 AlterOperation::DisableEvents => {
1045 messages.push(format!("events disabled on '{}'", query.name));
1046 }
1047 AlterOperation::AddSubscription { name, descriptor } => {
1048 let mut sub = descriptor.clone();
1049 sub.name = name.clone();
1050 sub.source = query.name.clone();
1051 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052 ensure_event_target_queue(self, &sub.target_queue)?;
1053 messages.push(format!(
1054 "subscription '{}' added on '{}' to '{}'",
1055 name, query.name, sub.target_queue
1056 ));
1057 }
1058 AlterOperation::DropSubscription { name } => {
1059 messages.push(format!(
1060 "subscription '{}' dropped on '{}'",
1061 name, query.name
1062 ));
1063 }
1064 AlterOperation::AddSigner { pubkey } => {
1065 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072 return Err(RedDBError::Query(format!(
1073 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075 query.name
1076 )));
1077 }
1078 let actor = crate::runtime::impl_core::current_user_projected()
1079 .unwrap_or_else(|| "@system/alter".to_string());
1080 let changed = crate::runtime::signed_writes_kind::add_signer(
1081 &store,
1082 &query.name,
1083 *pubkey,
1084 &actor,
1085 );
1086 messages.push(format!(
1087 "signer {} on '{}'",
1088 if changed { "added" } else { "already present" },
1089 query.name
1090 ));
1091 }
1092 AlterOperation::RevokeSigner { pubkey } => {
1093 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094 return Err(RedDBError::Query(format!(
1095 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096 query.name
1097 )));
1098 }
1099 let actor = crate::runtime::impl_core::current_user_projected()
1100 .unwrap_or_else(|| "@system/alter".to_string());
1101 let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102 &store,
1103 &query.name,
1104 pubkey,
1105 &actor,
1106 );
1107 messages.push(format!(
1108 "signer {} on '{}'",
1109 if changed {
1110 "revoked"
1111 } else {
1112 "already revoked"
1113 },
1114 query.name
1115 ));
1116 }
1117 AlterOperation::SetRetention { duration_ms } => {
1118 let existing = self.inner.db.collection_contract(&query.name);
1124 let has_ts_column = existing
1125 .as_ref()
1126 .map(retention_timestamp_column_exists)
1127 .unwrap_or(false);
1128 if !has_ts_column {
1129 return Err(RedDBError::Query(format!(
1130 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132 or enable WITH timestamps = true before setting a \
1133 retention policy",
1134 query.name
1135 )));
1136 }
1137 messages.push(format!(
1138 "retention set to {duration_ms} ms on '{}'",
1139 query.name
1140 ));
1141 }
1142 AlterOperation::UnsetRetention => {
1143 messages.push(format!("retention cleared on '{}'", query.name));
1144 }
1145 AlterOperation::AddAnalytics(views) => {
1146 self.require_graph_for_analytics(&query.name)?;
1151 let existing = self.inner.db.collection_contract(&query.name);
1152 let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153 existing
1154 .as_ref()
1155 .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156 .unwrap_or_default();
1157 for view in views {
1158 if enabled.contains(&view.output) {
1159 messages.push(format!(
1162 "analytics '{}' already enabled on '{}'",
1163 view.output.as_str(),
1164 query.name
1165 ));
1166 } else {
1167 messages.push(format!(
1168 "analytics '{}' enabled on '{}'",
1169 view.output.as_str(),
1170 query.name
1171 ));
1172 }
1173 }
1174 }
1175 AlterOperation::DropAnalytics(output) => {
1176 self.require_graph_for_analytics(&query.name)?;
1177 let enabled = self
1178 .inner
1179 .db
1180 .collection_contract(&query.name)
1181 .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182 .unwrap_or(false);
1183 if !enabled {
1184 return Err(RedDBError::Query(format!(
1187 "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188 output.as_str(),
1189 query.name
1190 )));
1191 }
1192 messages.push(format!(
1193 "analytics '{}' disabled on '{}'",
1194 output.as_str(),
1195 query.name
1196 ));
1197 }
1198 }
1199 }
1200
1201 let mut contract = self
1202 .inner
1203 .db
1204 .collection_contract(&query.name)
1205 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206 apply_alter_operations_to_contract(&mut contract, &query.operations);
1207 contract.version = contract.version.saturating_add(1);
1208 contract.updated_at_unix_ms = current_unix_ms();
1209 self.inner
1210 .db
1211 .save_collection_contract(contract)
1212 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213 if !fields_added.is_empty() || !fields_removed.is_empty() {
1217 let sub_names: Vec<String> = self
1218 .inner
1219 .db
1220 .collection_contract(&query.name)
1221 .map(|c| {
1222 c.subscriptions
1223 .iter()
1224 .filter(|s| s.enabled)
1225 .map(|s| s.name.clone())
1226 .collect()
1227 })
1228 .unwrap_or_default();
1229 if !sub_names.is_empty() {
1230 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231 collection: query.name.clone(),
1232 subscription_names: sub_names.join(", "),
1233 fields_added: fields_added.join(", "),
1234 fields_removed: fields_removed.join(", "),
1235 lsn: self.cdc_current_lsn(),
1236 }
1237 .emit_global();
1238 }
1239 }
1240
1241 self.clear_table_planner_stats(&query.name);
1242 self.invalidate_result_cache();
1243 let post_alter_columns: Vec<String> = self
1248 .inner
1249 .db
1250 .collection_contract(&query.name)
1251 .map(|contract| {
1252 contract
1253 .declared_columns
1254 .iter()
1255 .map(|col| col.name.clone())
1256 .collect()
1257 })
1258 .unwrap_or_default();
1259 self.schema_vocabulary_apply(
1260 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261 collection: query.name.clone(),
1262 columns: post_alter_columns,
1263 type_tags: Vec::new(),
1264 description: None,
1265 },
1266 );
1267
1268 let message = if messages.is_empty() {
1269 format!("table '{}' altered (no operations)", query.name)
1270 } else {
1271 format!("table '{}' altered: {}", query.name, messages.join(", "))
1272 };
1273
1274 Ok(RuntimeQueryResult::ok_message(
1275 raw_query.to_string(),
1276 &message,
1277 "alter",
1278 ))
1279 }
1280
1281 pub fn execute_explain_alter(
1288 &self,
1289 raw_query: &str,
1290 query: &ExplainAlterQuery,
1291 ) -> RedDBResult<RuntimeQueryResult> {
1292 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297 let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300 .as_ref()
1301 .map(|c| c.declared_columns.clone())
1302 .unwrap_or_default();
1303
1304 let diff = super::schema_diff::compute_column_diff(
1305 &query.target.name,
1306 ¤t_columns,
1307 &query.target.columns,
1308 );
1309
1310 let rendered = match query.format {
1311 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313 };
1314
1315 let format_label = match query.format {
1316 ExplainFormat::Sql => "sql",
1317 ExplainFormat::Json => "json",
1318 };
1319
1320 let columns = vec![
1321 "table".to_string(),
1322 "format".to_string(),
1323 "diff".to_string(),
1324 ];
1325 let row = vec![
1326 ("table".to_string(), Value::text(query.target.name.clone())),
1327 ("format".to_string(), Value::text(format_label.to_string())),
1328 ("diff".to_string(), Value::text(rendered)),
1329 ];
1330
1331 Ok(RuntimeQueryResult::ok_records(
1332 raw_query.to_string(),
1333 columns,
1334 vec![row],
1335 "explain",
1336 ))
1337 }
1338
1339 pub fn execute_create_index(
1344 &self,
1345 raw_query: &str,
1346 query: &CreateIndexQuery,
1347 ) -> RedDBResult<RuntimeQueryResult> {
1348 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349 let store = self.inner.db.store();
1350
1351 let manager = store
1353 .get_collection(&query.table)
1354 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356 let method_kind = match query.method {
1357 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361 };
1362
1363 let entities = manager.query_all(|_| true);
1373 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374 entities
1375 .iter()
1376 .map(|e| {
1377 let fields = match &e.data {
1378 crate::storage::EntityData::Row(row) => {
1379 if let Some(ref named) = row.named {
1380 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381 } else if let Some(ref schema) = row.schema {
1382 schema
1386 .iter()
1387 .zip(row.columns.iter())
1388 .map(|(k, v)| (k.clone(), v.clone()))
1389 .collect()
1390 } else {
1391 Vec::new()
1392 }
1393 }
1394 crate::storage::EntityData::Node(node) => node
1395 .properties
1396 .iter()
1397 .map(|(k, v)| (k.clone(), v.clone()))
1398 .collect(),
1399 _ => Vec::new(),
1400 };
1401 (e.id, fields)
1402 })
1403 .collect();
1404
1405 let indexed_count = self
1407 .inner
1408 .index_store
1409 .create_index(
1410 &query.name,
1411 &query.table,
1412 &query.columns,
1413 method_kind,
1414 query.unique,
1415 &entity_fields,
1416 )
1417 .map_err(RedDBError::Internal)?;
1418
1419 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420 &query.table,
1421 &entity_fields,
1422 );
1423 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424 self.invalidate_plan_cache();
1425
1426 self.inner
1428 .index_store
1429 .register(super::index_store::RegisteredIndex {
1430 name: query.name.clone(),
1431 collection: query.table.clone(),
1432 columns: query.columns.clone(),
1433 method: method_kind,
1434 unique: query.unique,
1435 });
1436 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1440 collection: query.table.clone(),
1441 index: query.name.clone(),
1442 columns: query.columns.clone(),
1443 });
1444
1445 let method_str = format!("{}", query.method);
1446 let unique_str = if query.unique { "unique " } else { "" };
1447 let cols = query.columns.join(", ");
1448
1449 Ok(RuntimeQueryResult::ok_message(
1450 raw_query.to_string(),
1451 &format!(
1452 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1453 unique_str, query.name, query.table, cols, method_str, indexed_count
1454 ),
1455 "create",
1456 ))
1457 }
1458
1459 pub fn execute_drop_index(
1463 &self,
1464 raw_query: &str,
1465 query: &DropIndexQuery,
1466 ) -> RedDBResult<RuntimeQueryResult> {
1467 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1468 let store = self.inner.db.store();
1469
1470 if store.get_collection(&query.table).is_none() {
1472 if query.if_exists {
1473 return Ok(RuntimeQueryResult::ok_message(
1474 raw_query.to_string(),
1475 &format!("table '{}' does not exist", query.table),
1476 "drop",
1477 ));
1478 }
1479 return Err(RedDBError::NotFound(format!(
1480 "table '{}' not found",
1481 query.table
1482 )));
1483 }
1484
1485 self.inner.index_store.drop_index(&query.name, &query.table);
1487 self.invalidate_plan_cache();
1488 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1490 collection: query.table.clone(),
1491 index: query.name.clone(),
1492 });
1493
1494 Ok(RuntimeQueryResult::ok_message(
1495 raw_query.to_string(),
1496 &format!("index '{}' dropped from '{}'", query.name, query.table),
1497 "drop",
1498 ))
1499 }
1500
1501 fn execute_drop_typed_collection(
1502 &self,
1503 raw_query: &str,
1504 name: &str,
1505 if_exists: bool,
1506 expected_model: CollectionModel,
1507 label: &str,
1508 ) -> RedDBResult<RuntimeQueryResult> {
1509 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1510 if is_system_schema_name(name) {
1511 return Err(RedDBError::Query("system schema is read-only".to_string()));
1512 }
1513 let store = self.inner.db.store();
1514 if store.get_collection(name).is_none() {
1515 if if_exists {
1516 return Ok(RuntimeQueryResult::ok_message(
1517 raw_query.to_string(),
1518 &format!("{label} '{name}' does not exist"),
1519 "drop",
1520 ));
1521 }
1522 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1523 }
1524
1525 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1526 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1527 self.drop_collection_storage(raw_query, name, label)
1528 }
1529
1530 pub fn execute_truncate(
1531 &self,
1532 raw_query: &str,
1533 query: &TruncateQuery,
1534 ) -> RedDBResult<RuntimeQueryResult> {
1535 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1536 if is_system_schema_name(&query.name) {
1537 return Err(RedDBError::Query("system schema is read-only".to_string()));
1538 }
1539
1540 let label = query
1541 .model
1542 .map(polymorphic_resolver::model_name)
1543 .unwrap_or("collection");
1544 let store = self.inner.db.store();
1545 if store.get_collection(&query.name).is_none() {
1546 if query.if_exists {
1547 return Ok(RuntimeQueryResult::ok_message(
1548 raw_query.to_string(),
1549 &format!("{label} '{}' does not exist", query.name),
1550 "truncate",
1551 ));
1552 }
1553 return Err(RedDBError::NotFound(format!(
1554 "{label} '{}' not found",
1555 query.name
1556 )));
1557 }
1558
1559 let actual =
1560 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1561 if let Some(expected) = query.model {
1562 polymorphic_resolver::ensure_model_match(expected, actual)?;
1563 }
1564
1565 if actual == CollectionModel::Queue {
1566 return self.execute_queue_command(
1567 raw_query,
1568 &QueueCommand::Purge {
1569 queue: query.name.clone(),
1570 },
1571 );
1572 }
1573
1574 let affected = self.truncate_collection_entities(&query.name)?;
1576 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1578 self.inner.db.invalidate_vector_index(&query.name);
1579 self.clear_table_planner_stats(&query.name);
1580 self.invalidate_result_cache();
1581
1582 Ok(RuntimeQueryResult::ok_message(
1583 raw_query.to_string(),
1584 &format!(
1585 "{affected} entities truncated from {label} '{}'",
1586 query.name
1587 ),
1588 "truncate",
1589 ))
1590 }
1591
1592 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1593 let store = self.inner.db.store();
1594 let Some(manager) = store.get_collection(name) else {
1595 return Ok(0);
1596 };
1597 let entities = manager.query_all(|_| true);
1598 if entities.is_empty() {
1599 return Ok(0);
1600 }
1601
1602 for entity in &entities {
1603 let fields = entity_index_fields(&entity.data);
1604 self.inner
1605 .index_store
1606 .index_entity_delete(name, entity.id, &fields)
1607 .map_err(RedDBError::Internal)?;
1608 }
1609
1610 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1611 let deleted_ids = store
1612 .delete_batch(name, &ids)
1613 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1614 for id in &deleted_ids {
1615 store.context_index().remove_entity(*id);
1616 }
1617 Ok(deleted_ids.len() as u64)
1618 }
1619
1620 fn drop_collection_storage(
1621 &self,
1622 raw_query: &str,
1623 name: &str,
1624 label: &str,
1625 ) -> RedDBResult<RuntimeQueryResult> {
1626 let store = self.inner.db.store();
1627
1628 let final_count = store
1631 .get_collection(name)
1632 .map(|manager| manager.query_all(|_| true).len() as u64)
1633 .unwrap_or(0);
1634 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1635 self,
1636 name,
1637 final_count,
1638 )?;
1639
1640 let orphaned_indices: Vec<String> = self
1641 .inner
1642 .index_store
1643 .list_indices(name)
1644 .into_iter()
1645 .map(|index| index.name)
1646 .collect();
1647 for index_name in &orphaned_indices {
1648 self.inner.index_store.drop_index(index_name, name);
1649 }
1650
1651 store
1652 .drop_collection(name)
1653 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1654 self.inner.db.invalidate_vector_index(name);
1655 self.inner.db.clear_collection_default_ttl_ms(name);
1656 self.inner
1657 .db
1658 .remove_collection_contract(name)
1659 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1660 self.clear_table_planner_stats(name);
1661 self.invalidate_result_cache();
1662 if let Some(store) = self.inner.auth_store.read().clone() {
1663 store.invalidate_visible_collections_cache();
1664 }
1665 self.inner
1666 .db
1667 .persist_metadata()
1668 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1669 self.schema_vocabulary_apply(
1670 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1671 collection: name.to_string(),
1672 },
1673 );
1674
1675 Ok(RuntimeQueryResult::ok_message(
1676 raw_query.to_string(),
1677 &format!("{label} '{name}' dropped"),
1678 "drop",
1679 ))
1680 }
1681}
1682
1683pub(crate) fn is_system_schema_name(name: &str) -> bool {
1684 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1685}
1686
1687fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1688 match data {
1689 EntityData::Row(row) => {
1690 if let Some(ref named) = row.named {
1691 named
1692 .iter()
1693 .map(|(key, value)| (key.clone(), value.clone()))
1694 .collect()
1695 } else if let Some(ref schema) = row.schema {
1696 schema
1697 .iter()
1698 .zip(row.columns.iter())
1699 .map(|(key, value)| (key.clone(), value.clone()))
1700 .collect()
1701 } else {
1702 Vec::new()
1703 }
1704 }
1705 EntityData::Node(node) => node
1706 .properties
1707 .iter()
1708 .map(|(key, value)| (key.clone(), value.clone()))
1709 .collect(),
1710 _ => Vec::new(),
1711 }
1712}
1713
1714fn collection_contract_from_create_table(
1715 query: &CreateTableQuery,
1716) -> RedDBResult<crate::physical::CollectionContract> {
1717 let now = current_unix_ms();
1718 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1719 .columns
1720 .iter()
1721 .map(declared_column_contract_from_ddl)
1722 .collect();
1723 if query.timestamps {
1724 declared_columns.push(crate::physical::DeclaredColumnContract {
1728 name: "created_at".to_string(),
1729 data_type: "BIGINT".to_string(),
1730 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1731 not_null: true,
1732 default: None,
1733 compress: None,
1734 unique: false,
1735 primary_key: false,
1736 enum_variants: Vec::new(),
1737 array_element: None,
1738 decimal_precision: None,
1739 });
1740 declared_columns.push(crate::physical::DeclaredColumnContract {
1741 name: "updated_at".to_string(),
1742 data_type: "BIGINT".to_string(),
1743 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1744 not_null: true,
1745 default: None,
1746 compress: None,
1747 unique: false,
1748 primary_key: false,
1749 enum_variants: Vec::new(),
1750 array_element: None,
1751 decimal_precision: None,
1752 });
1753 }
1754 Ok(crate::physical::CollectionContract {
1755 name: query.name.clone(),
1756 declared_model: crate::catalog::CollectionModel::Table,
1757 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1758 origin: crate::physical::ContractOrigin::Explicit,
1759 version: 1,
1760 created_at_unix_ms: now,
1761 updated_at_unix_ms: now,
1762 default_ttl_ms: query.default_ttl_ms,
1763 vector_dimension: None,
1764 vector_metric: None,
1765 context_index_fields: query.context_index_fields.clone(),
1766 declared_columns,
1767 table_def: Some(build_table_def_from_create_table(query)?),
1768 timestamps_enabled: query.timestamps,
1769 context_index_enabled: query.context_index_enabled
1770 || !query.context_index_fields.is_empty(),
1771 metrics_raw_retention_ms: None,
1772 metrics_rollup_policies: Vec::new(),
1773 metrics_tenant_identity: None,
1774 metrics_namespace: None,
1775 append_only: query.append_only,
1776 subscriptions: query.subscriptions.clone(),
1777 analytics_config: Vec::new(),
1778 session_key: None,
1779 session_gap_ms: None,
1780 retention_duration_ms: None,
1781 analytical_storage: None,
1782 })
1783}
1784
1785fn default_collection_contract_for_existing_table(
1786 name: &str,
1787) -> crate::physical::CollectionContract {
1788 let now = current_unix_ms();
1789 crate::physical::CollectionContract {
1790 name: name.to_string(),
1791 declared_model: crate::catalog::CollectionModel::Table,
1792 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1793 origin: crate::physical::ContractOrigin::Explicit,
1794 version: 0,
1795 created_at_unix_ms: now,
1796 updated_at_unix_ms: now,
1797 default_ttl_ms: None,
1798 vector_dimension: None,
1799 vector_metric: None,
1800 context_index_fields: Vec::new(),
1801 declared_columns: Vec::new(),
1802 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1803 timestamps_enabled: false,
1804 context_index_enabled: false,
1805 metrics_raw_retention_ms: None,
1806 metrics_rollup_policies: Vec::new(),
1807 metrics_tenant_identity: None,
1808 metrics_namespace: None,
1809 append_only: false,
1810 subscriptions: Vec::new(),
1811 analytics_config: Vec::new(),
1812 session_key: None,
1813 session_gap_ms: None,
1814 retention_duration_ms: None,
1815 analytical_storage: None,
1816 }
1817}
1818
1819fn keyed_collection_contract(
1820 name: &str,
1821 model: crate::catalog::CollectionModel,
1822 analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1823) -> crate::physical::CollectionContract {
1824 let now = current_unix_ms();
1825 crate::physical::CollectionContract {
1826 name: name.to_string(),
1827 declared_model: model,
1828 schema_mode: crate::catalog::SchemaMode::Dynamic,
1829 origin: crate::physical::ContractOrigin::Explicit,
1830 version: 1,
1831 created_at_unix_ms: now,
1832 updated_at_unix_ms: now,
1833 default_ttl_ms: None,
1834 vector_dimension: None,
1835 vector_metric: None,
1836 context_index_fields: Vec::new(),
1837 declared_columns: Vec::new(),
1838 table_def: None,
1839 timestamps_enabled: false,
1840 context_index_enabled: false,
1841 metrics_raw_retention_ms: None,
1842 metrics_rollup_policies: Vec::new(),
1843 metrics_tenant_identity: None,
1844 metrics_namespace: None,
1845 append_only: false,
1846 subscriptions: Vec::new(),
1847 analytics_config,
1848 session_key: None,
1849 session_gap_ms: None,
1850 retention_duration_ms: None,
1851 analytical_storage: None,
1852 }
1853}
1854
1855fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1856 let now = current_unix_ms();
1857 crate::physical::CollectionContract {
1858 name: query.name.clone(),
1859 declared_model: crate::catalog::CollectionModel::Metrics,
1860 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1861 origin: crate::physical::ContractOrigin::Explicit,
1862 version: 1,
1863 created_at_unix_ms: now,
1864 updated_at_unix_ms: now,
1865 default_ttl_ms: query.default_ttl_ms,
1866 vector_dimension: None,
1867 vector_metric: None,
1868 context_index_fields: Vec::new(),
1869 declared_columns: Vec::new(),
1870 table_def: None,
1871 timestamps_enabled: false,
1872 context_index_enabled: false,
1873 metrics_raw_retention_ms: query.default_ttl_ms,
1874 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1875 metrics_tenant_identity: Some(
1876 query
1877 .tenant_by
1878 .clone()
1879 .unwrap_or_else(|| "current_tenant".to_string()),
1880 ),
1881 metrics_namespace: Some("default".to_string()),
1882 append_only: true,
1883 subscriptions: Vec::new(),
1884 analytics_config: Vec::new(),
1885 session_key: None,
1886 session_gap_ms: None,
1887 retention_duration_ms: None,
1888 analytical_storage: None,
1889 }
1890}
1891
1892fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1893 let now = current_unix_ms();
1894 crate::physical::CollectionContract {
1895 name: query.name.clone(),
1896 declared_model: crate::catalog::CollectionModel::Vector,
1897 schema_mode: crate::catalog::SchemaMode::Dynamic,
1898 origin: crate::physical::ContractOrigin::Explicit,
1899 version: 1,
1900 created_at_unix_ms: now,
1901 updated_at_unix_ms: now,
1902 default_ttl_ms: None,
1903 vector_dimension: Some(query.dimension),
1904 vector_metric: Some(query.metric),
1905 context_index_fields: Vec::new(),
1906 declared_columns: Vec::new(),
1907 table_def: None,
1908 timestamps_enabled: false,
1909 context_index_enabled: false,
1910 metrics_raw_retention_ms: None,
1911 metrics_rollup_policies: Vec::new(),
1912 metrics_tenant_identity: None,
1913 metrics_namespace: None,
1914 append_only: false,
1915 subscriptions: Vec::new(),
1916 analytics_config: Vec::new(),
1917 session_key: None,
1918 session_gap_ms: None,
1919 retention_duration_ms: None,
1920 analytical_storage: None,
1921 }
1922}
1923
1924fn declared_column_contract_from_ddl(
1925 column: &CreateColumnDef,
1926) -> crate::physical::DeclaredColumnContract {
1927 crate::physical::DeclaredColumnContract {
1928 name: column.name.clone(),
1929 data_type: column.data_type.clone(),
1930 sql_type: Some(column.sql_type.clone()),
1931 not_null: column.not_null,
1932 default: column.default.clone(),
1933 compress: column.compress,
1934 unique: column.unique,
1935 primary_key: column.primary_key,
1936 enum_variants: column.enum_variants.clone(),
1937 array_element: column.array_element.clone(),
1938 decimal_precision: column.decimal_precision,
1939 }
1940}
1941
1942fn apply_alter_operations_to_contract(
1943 contract: &mut crate::physical::CollectionContract,
1944 operations: &[AlterOperation],
1945) {
1946 if contract.table_def.is_none() {
1947 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1948 }
1949 for operation in operations {
1950 match operation {
1951 AlterOperation::AddColumn(column) => {
1952 if !contract
1953 .declared_columns
1954 .iter()
1955 .any(|existing| existing.name == column.name)
1956 {
1957 contract
1958 .declared_columns
1959 .push(declared_column_contract_from_ddl(column));
1960 }
1961 if let Some(table_def) = contract.table_def.as_mut() {
1962 if table_def.get_column(&column.name).is_none() {
1963 if let Ok(column_def) = column_def_from_ddl(column) {
1964 if column.primary_key {
1965 table_def.primary_key.push(column.name.clone());
1966 table_def.constraints.push(
1967 crate::storage::schema::Constraint::new(
1968 format!("pk_{}", column.name),
1969 crate::storage::schema::ConstraintType::PrimaryKey,
1970 )
1971 .on_columns(vec![column.name.clone()]),
1972 );
1973 }
1974 if column.unique {
1975 table_def.constraints.push(
1976 crate::storage::schema::Constraint::new(
1977 format!("uniq_{}", column.name),
1978 crate::storage::schema::ConstraintType::Unique,
1979 )
1980 .on_columns(vec![column.name.clone()]),
1981 );
1982 }
1983 if column.not_null {
1984 table_def.constraints.push(
1985 crate::storage::schema::Constraint::new(
1986 format!("not_null_{}", column.name),
1987 crate::storage::schema::ConstraintType::NotNull,
1988 )
1989 .on_columns(vec![column.name.clone()]),
1990 );
1991 }
1992 table_def.columns.push(column_def);
1993 }
1994 }
1995 }
1996 }
1997 AlterOperation::DropColumn(name) => {
1998 contract
1999 .declared_columns
2000 .retain(|column| column.name != *name);
2001 if let Some(table_def) = contract.table_def.as_mut() {
2002 if let Some(index) = table_def.column_index(name) {
2003 table_def.columns.remove(index);
2004 }
2005 table_def.primary_key.retain(|column| column != name);
2006 table_def.constraints.retain(|constraint| {
2007 !constraint.columns.iter().any(|column| column == name)
2008 });
2009 table_def
2010 .indexes
2011 .retain(|index| !index.columns.iter().any(|column| column == name));
2012 }
2013 }
2014 AlterOperation::RenameColumn { from, to } => {
2015 if contract
2016 .declared_columns
2017 .iter()
2018 .any(|column| column.name == *to)
2019 {
2020 continue;
2021 }
2022 if let Some(column) = contract
2023 .declared_columns
2024 .iter_mut()
2025 .find(|column| column.name == *from)
2026 {
2027 column.name = to.clone();
2028 }
2029 if let Some(table_def) = contract.table_def.as_mut() {
2030 if let Some(column) = table_def
2031 .columns
2032 .iter_mut()
2033 .find(|column| column.name == *from)
2034 {
2035 column.name = to.clone();
2036 }
2037 for primary_key in &mut table_def.primary_key {
2038 if *primary_key == *from {
2039 *primary_key = to.clone();
2040 }
2041 }
2042 for constraint in &mut table_def.constraints {
2043 for column in &mut constraint.columns {
2044 if *column == *from {
2045 *column = to.clone();
2046 }
2047 }
2048 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2049 for column in ref_columns {
2050 if *column == *from {
2051 *column = to.clone();
2052 }
2053 }
2054 }
2055 }
2056 for index in &mut table_def.indexes {
2057 for column in &mut index.columns {
2058 if *column == *from {
2059 *column = to.clone();
2060 }
2061 }
2062 }
2063 }
2064 }
2065 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2068 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2072 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2075 AlterOperation::SetAppendOnly(on) => {
2076 contract.append_only = *on;
2077 }
2078 AlterOperation::SetVersioned(_) => {}
2081 AlterOperation::EnableEvents(subscription) => {
2082 let mut subscription = subscription.clone();
2083 subscription.source = contract.name.clone();
2084 subscription.enabled = true;
2085 if let Some(existing) = contract
2086 .subscriptions
2087 .iter_mut()
2088 .find(|existing| existing.target_queue == subscription.target_queue)
2089 {
2090 *existing = subscription;
2091 } else {
2092 contract.subscriptions.push(subscription);
2093 }
2094 }
2095 AlterOperation::DisableEvents => {
2096 for subscription in &mut contract.subscriptions {
2097 subscription.enabled = false;
2098 }
2099 }
2100 AlterOperation::AddSubscription { name, descriptor } => {
2101 let mut sub = descriptor.clone();
2102 sub.name = name.clone();
2103 sub.source = contract.name.clone();
2104 sub.enabled = true;
2105 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2106 {
2107 *existing = sub;
2108 } else {
2109 contract.subscriptions.push(sub);
2110 }
2111 }
2112 AlterOperation::DropSubscription { name } => {
2113 contract.subscriptions.retain(|s| s.name != *name);
2114 }
2115 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2120 AlterOperation::SetRetention { duration_ms } => {
2121 contract.retention_duration_ms = Some(*duration_ms);
2122 }
2123 AlterOperation::UnsetRetention => {
2124 contract.retention_duration_ms = None;
2125 }
2126 AlterOperation::AddAnalytics(views) => {
2130 for view in views {
2131 if !contract
2135 .analytics_config
2136 .iter()
2137 .any(|existing| existing.output == view.output)
2138 {
2139 contract.analytics_config.push(view.clone());
2140 }
2141 }
2142 }
2143 AlterOperation::DropAnalytics(output) => {
2144 contract
2145 .analytics_config
2146 .retain(|view| view.output != *output);
2147 }
2148 }
2149 }
2150}
2151
2152pub(crate) fn retention_timestamp_column_exists(
2157 contract: &crate::physical::CollectionContract,
2158) -> bool {
2159 if contract.timestamps_enabled {
2160 return true;
2161 }
2162 if matches!(
2163 contract.declared_model,
2164 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2165 ) {
2166 return true;
2170 }
2171 contract
2172 .declared_columns
2173 .iter()
2174 .any(|column| is_temporal_data_type(&column.data_type))
2175}
2176
2177fn is_temporal_data_type(data_type: &str) -> bool {
2178 let upper = data_type.to_ascii_uppercase();
2179 matches!(
2180 upper.as_str(),
2181 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2182 )
2183}
2184
2185fn validate_event_subscriptions(
2186 runtime: &RedDBRuntime,
2187 source: &str,
2188 subscriptions: &[crate::catalog::SubscriptionDescriptor],
2189) -> RedDBResult<()> {
2190 for subscription in subscriptions
2191 .iter()
2192 .filter(|subscription| subscription.enabled)
2193 {
2194 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2195 return Err(RedDBError::Query(
2196 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2197 ));
2198 }
2199 validate_subscription_auth(runtime, source, subscription)?;
2200 if subscription.target_queue == source
2201 || subscription_would_create_cycle(
2202 &runtime.inner.db,
2203 source,
2204 &subscription.target_queue,
2205 )
2206 {
2207 return Err(RedDBError::Query(
2208 "subscription would create cycle".to_string(),
2209 ));
2210 }
2211 audit_subscription_redact_gap(runtime, source, subscription);
2212 }
2213 Ok(())
2214}
2215
2216fn validate_subscription_auth(
2217 runtime: &RedDBRuntime,
2218 source: &str,
2219 subscription: &crate::catalog::SubscriptionDescriptor,
2220) -> RedDBResult<()> {
2221 let auth_store = match runtime.inner.auth_store.read().clone() {
2222 Some(store) => store,
2223 None => return Ok(()),
2224 };
2225 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2226 Some(identity) => identity,
2227 None => return Ok(()),
2228 };
2229 let tenant = crate::runtime::impl_core::current_tenant();
2230 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2231
2232 if auth_store.iam_authorization_enabled() {
2233 let ctx = crate::auth::policies::EvalContext {
2234 principal_tenant: tenant.clone(),
2235 current_tenant: tenant.clone(),
2236 peer_ip: None,
2237 mfa_present: false,
2238 now_ms: crate::auth::now_ms(),
2239 principal_is_admin_role: role == crate::auth::Role::Admin,
2240 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2241 principal_is_platform_scoped: principal.tenant.is_none(),
2242 };
2243 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2244 if let Some(t) = tenant.as_deref() {
2245 source_resource = source_resource.with_tenant(t.to_string());
2246 }
2247 if !auth_store.check_policy_authz_with_role(
2248 &principal,
2249 "select",
2250 &source_resource,
2251 &ctx,
2252 role,
2253 ) {
2254 return Err(RedDBError::Query(format!(
2255 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2256 principal, source_resource.kind, source_resource.name
2257 )));
2258 }
2259
2260 let mut target_resource =
2261 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2262 if let Some(t) = tenant.as_deref() {
2263 target_resource = target_resource.with_tenant(t.to_string());
2264 }
2265 if !auth_store.check_policy_authz_with_role(
2266 &principal,
2267 "write",
2268 &target_resource,
2269 &ctx,
2270 role,
2271 ) {
2272 return Err(RedDBError::Query(format!(
2273 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2274 principal, target_resource.kind, target_resource.name
2275 )));
2276 }
2277 return Ok(());
2278 }
2279
2280 let ctx = crate::auth::privileges::AuthzContext {
2281 principal: &username,
2282 effective_role: role,
2283 tenant: tenant.as_deref(),
2284 };
2285 auth_store
2286 .check_grant(
2287 &ctx,
2288 crate::auth::privileges::Action::Select,
2289 &crate::auth::privileges::Resource::table_from_name(source),
2290 )
2291 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2292 auth_store
2293 .check_grant(
2294 &ctx,
2295 crate::auth::privileges::Action::Insert,
2296 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2297 )
2298 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2299 Ok(())
2300}
2301
2302fn audit_subscription_redact_gap(
2303 runtime: &RedDBRuntime,
2304 source: &str,
2305 subscription: &crate::catalog::SubscriptionDescriptor,
2306) {
2307 let auth_store = match runtime.inner.auth_store.read().clone() {
2308 Some(store) if store.iam_authorization_enabled() => store,
2309 _ => return,
2310 };
2311 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2312 Some(identity) => identity,
2313 None => return,
2314 };
2315 let tenant = crate::runtime::impl_core::current_tenant();
2316 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2317 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2318 if missing.is_empty() {
2319 return;
2320 }
2321
2322 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2323 tracing::warn!(
2324 target: "reddb::operator",
2325 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2326 source,
2327 subscription.target_queue,
2328 columns
2329 );
2330 let mut event = AuditEvent::builder("subscription_redact_gap")
2331 .principal(username)
2332 .source(AuditAuthSource::System)
2333 .resource(format!(
2334 "subscription:{}->{}",
2335 source, subscription.target_queue
2336 ))
2337 .outcome(Outcome::Success)
2338 .field(AuditFieldEscaper::field("source", source))
2339 .field(AuditFieldEscaper::field(
2340 "target_queue",
2341 subscription.target_queue.clone(),
2342 ))
2343 .field(AuditFieldEscaper::field(
2344 "subscription",
2345 subscription.name.clone(),
2346 ))
2347 .field(AuditFieldEscaper::field("columns", columns))
2348 .field(AuditFieldEscaper::field("role", role.as_str()));
2349 if let Some(t) = tenant {
2350 event = event.tenant(t);
2351 }
2352 runtime.inner.audit_log.record_event(event.build());
2353}
2354
2355fn subscription_redact_gap_columns(
2356 auth_store: &crate::auth::store::AuthStore,
2357 principal: &crate::auth::UserId,
2358 source: &str,
2359 subscription: &crate::catalog::SubscriptionDescriptor,
2360) -> BTreeSet<String> {
2361 let redacted: HashSet<String> = subscription
2362 .redact_fields
2363 .iter()
2364 .map(|field| field.to_ascii_lowercase())
2365 .collect();
2366 auth_store
2367 .effective_policies(principal)
2368 .iter()
2369 .flat_map(|policy| policy.statements.iter())
2370 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2371 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2372 .flat_map(|statement| statement.resources.iter())
2373 .filter_map(|resource| denied_column_for_source(resource, source))
2374 .filter(|column| !redact_covers_column(&redacted, source, column))
2375 .collect()
2376}
2377
2378fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2379 match pattern {
2380 crate::auth::policies::ActionPattern::Wildcard => true,
2381 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2382 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2383 "select".len() > prefix.len() + 1
2384 && "select".starts_with(prefix)
2385 && "select".as_bytes()[prefix.len()] == b':'
2386 }
2387 }
2388}
2389
2390fn denied_column_for_source(
2391 resource: &crate::auth::policies::ResourcePattern,
2392 source: &str,
2393) -> Option<String> {
2394 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2395 return None;
2396 };
2397 if kind != "column" {
2398 return None;
2399 }
2400 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2401 (column.table_resource_name() == source).then_some(column.column)
2402}
2403
2404fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2405 let column = column.to_ascii_lowercase();
2406 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2407 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2408}
2409
2410fn subscription_would_create_cycle(
2411 db: &crate::storage::unified::devx::RedDB,
2412 source: &str,
2413 target: &str,
2414) -> bool {
2415 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2416 for contract in db.collection_contracts() {
2417 for subscription in contract
2418 .subscriptions
2419 .into_iter()
2420 .filter(|subscription| subscription.enabled)
2421 {
2422 graph
2423 .entry(subscription.source)
2424 .or_default()
2425 .push(subscription.target_queue);
2426 }
2427 }
2428 graph
2429 .entry(source.to_string())
2430 .or_default()
2431 .push(target.to_string());
2432
2433 let mut stack = vec![target.to_string()];
2434 let mut seen = HashSet::new();
2435 while let Some(node) = stack.pop() {
2436 if node == source {
2437 return true;
2438 }
2439 if !seen.insert(node.clone()) {
2440 continue;
2441 }
2442 if let Some(next) = graph.get(&node) {
2443 stack.extend(next.iter().cloned());
2444 }
2445 }
2446 false
2447}
2448
2449pub(crate) fn ensure_event_target_queue_pub(
2450 runtime: &RedDBRuntime,
2451 queue: &str,
2452) -> RedDBResult<()> {
2453 ensure_event_target_queue(runtime, queue)
2454}
2455
2456fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2457 let store = runtime.inner.db.store();
2458 if store.get_collection(queue).is_some() {
2459 return Ok(());
2460 }
2461 store
2462 .create_collection(queue)
2463 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2464 runtime
2465 .inner
2466 .db
2467 .save_collection_contract(event_queue_collection_contract(queue))
2468 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2469 store.set_config_tree(
2470 &format!("queue.{queue}.mode"),
2471 &crate::serde_json::Value::String("fanout".to_string()),
2472 );
2473 Ok(())
2474}
2475
2476fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2477 let now = current_unix_ms();
2478 crate::physical::CollectionContract {
2479 name: queue.to_string(),
2480 declared_model: crate::catalog::CollectionModel::Queue,
2481 schema_mode: crate::catalog::SchemaMode::Dynamic,
2482 origin: crate::physical::ContractOrigin::Implicit,
2483 version: 1,
2484 created_at_unix_ms: now,
2485 updated_at_unix_ms: now,
2486 default_ttl_ms: None,
2487 vector_dimension: None,
2488 vector_metric: None,
2489 context_index_fields: Vec::new(),
2490 declared_columns: Vec::new(),
2491 table_def: None,
2492 timestamps_enabled: false,
2493 context_index_enabled: false,
2494 metrics_raw_retention_ms: None,
2495 metrics_rollup_policies: Vec::new(),
2496 metrics_tenant_identity: None,
2497 metrics_namespace: None,
2498 append_only: true,
2499 subscriptions: Vec::new(),
2500 analytics_config: Vec::new(),
2501 session_key: None,
2502 session_gap_ms: None,
2503 retention_duration_ms: None,
2504 analytical_storage: None,
2505 }
2506}
2507
2508fn build_table_def_from_create_table(
2509 query: &CreateTableQuery,
2510) -> RedDBResult<crate::storage::schema::TableDef> {
2511 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2512 for column in &query.columns {
2513 if column.primary_key {
2514 table.primary_key.push(column.name.clone());
2515 table.constraints.push(
2516 crate::storage::schema::Constraint::new(
2517 format!("pk_{}", column.name),
2518 crate::storage::schema::ConstraintType::PrimaryKey,
2519 )
2520 .on_columns(vec![column.name.clone()]),
2521 );
2522 }
2523 if column.unique {
2524 table.constraints.push(
2525 crate::storage::schema::Constraint::new(
2526 format!("uniq_{}", column.name),
2527 crate::storage::schema::ConstraintType::Unique,
2528 )
2529 .on_columns(vec![column.name.clone()]),
2530 );
2531 }
2532 if column.not_null {
2533 table.constraints.push(
2534 crate::storage::schema::Constraint::new(
2535 format!("not_null_{}", column.name),
2536 crate::storage::schema::ConstraintType::NotNull,
2537 )
2538 .on_columns(vec![column.name.clone()]),
2539 );
2540 }
2541 table.columns.push(column_def_from_ddl(column)?);
2542 }
2543 if query.timestamps {
2548 table.columns.push(
2549 crate::storage::schema::ColumnDef::new(
2550 "created_at".to_string(),
2551 crate::storage::schema::DataType::UnsignedInteger,
2552 )
2553 .not_null(),
2554 );
2555 table.columns.push(
2556 crate::storage::schema::ColumnDef::new(
2557 "updated_at".to_string(),
2558 crate::storage::schema::DataType::UnsignedInteger,
2559 )
2560 .not_null(),
2561 );
2562 table.constraints.push(
2563 crate::storage::schema::Constraint::new(
2564 "not_null_created_at".to_string(),
2565 crate::storage::schema::ConstraintType::NotNull,
2566 )
2567 .on_columns(vec!["created_at".to_string()]),
2568 );
2569 table.constraints.push(
2570 crate::storage::schema::Constraint::new(
2571 "not_null_updated_at".to_string(),
2572 crate::storage::schema::ConstraintType::NotNull,
2573 )
2574 .on_columns(vec!["updated_at".to_string()]),
2575 );
2576 }
2577 table
2578 .validate()
2579 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2580 Ok(table)
2581}
2582
2583fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2584 let data_type = resolve_declared_data_type(&column.data_type)
2585 .map_err(|err| RedDBError::Query(err.to_string()))?;
2586 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2587 if column.not_null {
2588 column_def = column_def.not_null();
2589 }
2590 if let Some(default) = &column.default {
2591 column_def = column_def.with_default(default.as_bytes().to_vec());
2592 }
2593 if column.compress.unwrap_or(0) > 0 {
2594 column_def = column_def.compressed();
2595 }
2596 if !column.enum_variants.is_empty() {
2597 column_def = column_def.with_variants(column.enum_variants.clone());
2598 }
2599 if let Some(precision) = column.decimal_precision {
2600 column_def = column_def.with_precision(precision);
2601 }
2602 if let Some(element_type) = &column.array_element {
2603 column_def = column_def.with_element_type(
2604 resolve_declared_data_type(element_type)
2605 .map_err(|err| RedDBError::Query(err.to_string()))?,
2606 );
2607 }
2608 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2609 if column.unique {
2610 column_def = column_def.with_metadata("unique", "true");
2611 }
2612 if column.primary_key {
2613 column_def = column_def.with_metadata("primary_key", "true");
2614 }
2615 Ok(column_def)
2616}
2617
2618fn current_unix_ms() -> u128 {
2619 std::time::SystemTime::now()
2620 .duration_since(std::time::UNIX_EPOCH)
2621 .unwrap_or_default()
2622 .as_millis()
2623}
2624
2625#[cfg(test)]
2626mod tests {
2627 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2628 use crate::auth::store::{AuthStore, PrincipalRef};
2629 use crate::auth::UserId;
2630 use crate::auth::{AuthConfig, Role};
2631 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2632 use crate::storage::schema::Value;
2633 use crate::{RedDBOptions, RedDBRuntime};
2634 use std::sync::Arc;
2635
2636 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2637 Policy {
2638 id: id.to_string(),
2639 version: 1,
2640 tenant: None,
2641 created_at: 0,
2642 updated_at: 0,
2643 statements: vec![Statement {
2644 sid: None,
2645 effect: Effect::Allow,
2646 actions: vec![ActionPattern::Exact(action.to_string())],
2647 resources: vec![ResourcePattern::Exact {
2648 kind: "collection".to_string(),
2649 name: collection.to_string(),
2650 }],
2651 condition: None,
2652 }],
2653 }
2654 }
2655
2656 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2657 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2658 *rt.inner.auth_store.write() = Some(store.clone());
2659 store
2660 }
2661
2662 #[test]
2663 fn drop_denied_without_iam_policy() {
2664 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2665 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2666 let store = wire_auth_store(&rt);
2667 let select_only = Policy {
2669 id: "select-only".to_string(),
2670 version: 1,
2671 tenant: None,
2672 created_at: 0,
2673 updated_at: 0,
2674 statements: vec![Statement {
2675 sid: None,
2676 effect: Effect::Allow,
2677 actions: vec![ActionPattern::Exact("select".to_string())],
2678 resources: vec![ResourcePattern::Wildcard],
2679 condition: None,
2680 }],
2681 };
2682 store.put_policy_internal(select_only).unwrap();
2683 let alice = UserId::from_parts(None, "alice");
2684 store
2685 .attach_policy(PrincipalRef::User(alice), "select-only")
2686 .unwrap();
2687 set_current_auth_identity("alice".to_string(), Role::Write);
2688 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2689 clear_current_auth_identity();
2690 assert!(
2691 format!("{err}").contains("denied by IAM policy"),
2692 "got: {err}"
2693 );
2694 }
2695
2696 #[test]
2697 fn drop_allowed_with_explicit_iam_policy() {
2698 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2699 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2700 let store = wire_auth_store(&rt);
2701 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2702 store.put_policy_internal(policy).unwrap();
2703 let bob = UserId::from_parts(None, "bob");
2704 store
2705 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2706 .unwrap();
2707 set_current_auth_identity("bob".to_string(), Role::Write);
2708 rt.execute_query("DROP TABLE bar").unwrap();
2709 clear_current_auth_identity();
2710 }
2711
2712 #[test]
2713 fn drop_allowed_with_wildcard_iam_policy() {
2714 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2715 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2716 let store = wire_auth_store(&rt);
2717 let policy = Policy {
2718 id: "allow-drop-all".to_string(),
2719 version: 1,
2720 tenant: None,
2721 created_at: 0,
2722 updated_at: 0,
2723 statements: vec![Statement {
2724 sid: None,
2725 effect: Effect::Allow,
2726 actions: vec![ActionPattern::Exact("drop".to_string())],
2727 resources: vec![ResourcePattern::Wildcard],
2728 condition: None,
2729 }],
2730 };
2731 store.put_policy_internal(policy).unwrap();
2732 let carl = UserId::from_parts(None, "carl");
2733 store
2734 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2735 .unwrap();
2736 set_current_auth_identity("carl".to_string(), Role::Write);
2737 rt.execute_query("DROP TABLE baz").unwrap();
2738 clear_current_auth_identity();
2739 }
2740
2741 #[test]
2742 fn truncate_denied_without_iam_policy() {
2743 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2744 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2745 let store = wire_auth_store(&rt);
2746 store
2753 .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2754 let select_only = Policy {
2756 id: "select-only-2".to_string(),
2757 version: 1,
2758 tenant: None,
2759 created_at: 0,
2760 updated_at: 0,
2761 statements: vec![Statement {
2762 sid: None,
2763 effect: Effect::Allow,
2764 actions: vec![ActionPattern::Exact("select".to_string())],
2765 resources: vec![ResourcePattern::Wildcard],
2766 condition: None,
2767 }],
2768 };
2769 store.put_policy_internal(select_only).unwrap();
2770 let dana = UserId::from_parts(None, "dana");
2771 store
2772 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2773 .unwrap();
2774 set_current_auth_identity("dana".to_string(), Role::Write);
2775 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2776 clear_current_auth_identity();
2777 assert!(
2778 format!("{err}").contains("denied by IAM policy"),
2779 "got: {err}"
2780 );
2781 }
2782
2783 #[test]
2784 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2785 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2786 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2787 .unwrap();
2788 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2789 .unwrap();
2790 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2791 .unwrap();
2792
2793 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2794 assert_eq!(truncated.statement_type, "truncate");
2795 assert_eq!(truncated.affected_rows, 0);
2796
2797 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2798 assert!(empty.result.records.is_empty());
2799
2800 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2801 .unwrap();
2802 let selected = rt
2803 .execute_query("SELECT name FROM users WHERE id = 3")
2804 .unwrap();
2805 let name = selected.result.records[0].get("name").unwrap();
2806 assert_eq!(name, &Value::text("cy"));
2807 assert!(rt.db().collection_contract("users").is_some());
2808 assert!(rt
2809 .inner
2810 .index_store
2811 .list_indices("users")
2812 .iter()
2813 .any(|index| index.name == "idx_users_id"));
2814 }
2815
2816 #[test]
2817 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2818 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2819 rt.execute_query("CREATE QUEUE tasks").unwrap();
2820 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2821
2822 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2823 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2824
2825 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2826 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2827 assert_eq!(
2828 len.result.records[0].get("len"),
2829 Some(&Value::UnsignedInteger(0))
2830 );
2831 }
2832
2833 #[test]
2834 fn truncate_system_schema_is_read_only() {
2835 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2836 let err = rt
2837 .execute_query("TRUNCATE COLLECTION red.collections")
2838 .unwrap_err();
2839 assert!(format!("{err}").contains("system schema is read-only"));
2840 }
2841
2842 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2845 let result = rt
2846 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2847 .expect("peek queue");
2848 result
2849 .result
2850 .records
2851 .iter()
2852 .map(
2853 |record| match record.get("payload").expect("payload column") {
2854 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2855 other => panic!("expected JSON queue payload, got {other:?}"),
2856 },
2857 )
2858 .collect()
2859 }
2860
2861 #[test]
2864 fn truncate_event_enabled_table_emits_single_truncate_event() {
2865 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2867 .unwrap();
2868 rt.execute_query(
2869 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2870 )
2871 .unwrap();
2872
2873 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2875
2876 rt.execute_query("TRUNCATE TABLE users").unwrap();
2877
2878 let events = queue_payloads(&rt, "users_events");
2879 assert_eq!(
2881 events.len(),
2882 1,
2883 "expected 1 truncate event, got {}",
2884 events.len()
2885 );
2886 let ev = events[0].as_object().expect("event is object");
2887 assert_eq!(
2888 ev.get("op").and_then(crate::json::Value::as_str),
2889 Some("truncate")
2890 );
2891 assert_eq!(
2892 ev.get("collection").and_then(crate::json::Value::as_str),
2893 Some("users")
2894 );
2895 assert_eq!(
2896 ev.get("entities_count")
2897 .and_then(crate::json::Value::as_u64),
2898 Some(3)
2899 );
2900 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2901 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2902 assert!(ev
2903 .get("event_id")
2904 .and_then(crate::json::Value::as_str)
2905 .is_some_and(|s| !s.is_empty()));
2906 }
2907
2908 #[test]
2910 fn truncate_no_events_collection_emits_nothing() {
2911 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2912 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2913 .unwrap();
2914 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2915 .unwrap();
2916 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2918 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2920 assert!(rows.result.records.is_empty());
2921 }
2922
2923 #[test]
2927 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2928 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2930 .unwrap();
2931 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2932 .unwrap();
2933
2934 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2936
2937 rt.execute_query("DROP TABLE users").unwrap();
2938
2939 let events = queue_payloads(&rt, "users_events");
2941 assert_eq!(
2942 events.len(),
2943 1,
2944 "expected 1 collection_dropped event, got {}",
2945 events.len()
2946 );
2947 let ev = events[0].as_object().expect("event is object");
2948 assert_eq!(
2949 ev.get("op").and_then(crate::json::Value::as_str),
2950 Some("collection_dropped")
2951 );
2952 assert_eq!(
2953 ev.get("collection").and_then(crate::json::Value::as_str),
2954 Some("users")
2955 );
2956 assert_eq!(
2957 ev.get("final_entities_count")
2958 .and_then(crate::json::Value::as_u64),
2959 Some(2)
2960 );
2961 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2962 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2963 assert!(ev
2964 .get("event_id")
2965 .and_then(crate::json::Value::as_str)
2966 .is_some_and(|s| !s.is_empty()));
2967
2968 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2970 assert!(
2971 format!("{err}").contains("users"),
2972 "expected not-found error"
2973 );
2974 }
2975
2976 #[test]
2979 fn drop_no_events_collection_emits_nothing() {
2980 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2981 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2982 .unwrap();
2983 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2984 .unwrap();
2985 rt.execute_query("DROP TABLE plain").unwrap();
2986 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2988 assert!(format!("{err}").contains("plain"));
2989 }
2990
2991 #[test]
2995 fn ops_filter_insert_only_ignores_update_and_delete() {
2996 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2997 rt.execute_query(
2998 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2999 )
3000 .unwrap();
3001 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3002 .unwrap();
3003 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3004 .unwrap();
3005 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3006
3007 let events = queue_payloads(&rt, "items_events");
3008 assert_eq!(
3010 events.len(),
3011 1,
3012 "expected 1 insert event, got {}",
3013 events.len()
3014 );
3015 assert_eq!(
3016 events[0]
3017 .as_object()
3018 .unwrap()
3019 .get("op")
3020 .and_then(crate::json::Value::as_str),
3021 Some("insert")
3022 );
3023 }
3024
3025 #[test]
3027 fn where_filter_skips_rows_that_do_not_match() {
3028 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3029 rt.execute_query(
3030 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3031 )
3032 .unwrap();
3033
3034 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3036 .unwrap();
3037 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3039 .unwrap();
3040
3041 let events = queue_payloads(&rt, "users_events");
3042 assert_eq!(
3043 events.len(),
3044 1,
3045 "expected 1 event (only active), got {}",
3046 events.len()
3047 );
3048 let ev = events[0].as_object().unwrap();
3049 assert_eq!(
3050 ev.get("op").and_then(crate::json::Value::as_str),
3051 Some("insert")
3052 );
3053 let after = ev.get("after").unwrap().as_object().unwrap();
3054 assert_eq!(
3055 after.get("status").and_then(crate::json::Value::as_str),
3056 Some("active")
3057 );
3058 }
3059
3060 #[test]
3062 fn ops_filter_and_where_filter_combined() {
3063 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3064 rt.execute_query(
3065 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3066 )
3067 .unwrap();
3068
3069 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3071 .unwrap();
3072 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3074 .unwrap();
3075 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3077 .unwrap();
3078 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3080
3081 let events = queue_payloads(&rt, "items_events");
3082 assert_eq!(
3084 events.len(),
3085 1,
3086 "expected 1 event, got {}: {events:?}",
3087 events.len()
3088 );
3089 assert_eq!(
3090 events[0]
3091 .as_object()
3092 .unwrap()
3093 .get("op")
3094 .and_then(crate::json::Value::as_str),
3095 Some("insert")
3096 );
3097 }
3098
3099 #[test]
3101 fn where_filter_on_delete_checks_before_state() {
3102 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3103 rt.execute_query(
3104 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3105 )
3106 .unwrap();
3107
3108 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3109 .unwrap();
3110
3111 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3113 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3115
3116 let events = queue_payloads(&rt, "users_events");
3117 assert_eq!(
3118 events.len(),
3119 1,
3120 "expected 1 delete event, got {}",
3121 events.len()
3122 );
3123 let ev = events[0].as_object().unwrap();
3124 assert_eq!(
3125 ev.get("op").and_then(crate::json::Value::as_str),
3126 Some("delete")
3127 );
3128 }
3129
3130 #[test]
3134 fn alter_add_column_on_event_enabled_table_succeeds() {
3135 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3136 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3137 .unwrap();
3138 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3140 .unwrap();
3141 let contract = rt.db().collection_contract("users").unwrap();
3143 assert!(
3144 contract.declared_columns.iter().any(|c| c.name == "phone"),
3145 "phone column should be in contract"
3146 );
3147 assert!(
3149 contract.subscriptions.iter().any(|s| s.enabled),
3150 "subscription should remain enabled"
3151 );
3152 }
3153
3154 #[test]
3157 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3158 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3159 rt.execute_query(
3160 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3161 )
3162 .unwrap();
3163 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3165 .unwrap();
3166 let contract = rt.db().collection_contract("items").unwrap();
3167 assert!(
3168 !contract.declared_columns.iter().any(|c| c.name == "secret"),
3169 "secret column should be removed"
3170 );
3171 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3173 .unwrap();
3174 assert!(
3176 contract.subscriptions.iter().any(|s| s.enabled),
3177 "subscription should remain enabled"
3178 );
3179 }
3180
3181 #[test]
3187 fn create_vector_marks_collection_as_turbo_baseline() {
3188 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3189 rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3190 let store = rt.db().store();
3191 assert!(
3192 crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3193 "new vector collections must be turbo-marked baseline"
3194 );
3195 assert!(
3196 rt.db().turbo_state("embeddings").is_some(),
3197 "turbo_state must materialise after CREATE VECTOR"
3198 );
3199 }
3200
3201 #[test]
3205 fn create_table_does_not_mark_turbo() {
3206 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3207 rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3208 let store = rt.db().store();
3209 assert!(
3210 !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3211 "non-vector collections must not gain the turbo marker"
3212 );
3213 assert!(rt.db().turbo_state("plain").is_none());
3214 }
3215
3216 #[test]
3219 fn create_collection_kind_vector_turbo_still_marked() {
3220 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3221 rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3222 .unwrap();
3223 let store = rt.db().store();
3224 assert!(crate::runtime::vector_turbo_kind::is_turbo(
3225 &store, "turbo_v"
3226 ));
3227 assert!(rt.db().turbo_state("turbo_v").is_some());
3228 }
3229}