1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(
265 &query.name,
266 query.collection_model,
267 query.analytics_config.clone(),
268 )
269 };
270 self.inner
271 .db
272 .save_collection_contract(contract)
273 .map_err(|err| RedDBError::Internal(err.to_string()))?;
274 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275 store.set_config_tree(
276 &format!("red.collection_tenants.{}", query.name),
277 &crate::serde_json::Value::String(tenant_id),
278 );
279 }
280 self.inner
281 .db
282 .persist_metadata()
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 self.invalidate_result_cache();
285
286 Ok(RuntimeQueryResult::ok_message(
287 raw_query.to_string(),
288 &format!("{label} '{}' created", query.name),
289 "create",
290 ))
291 }
292
293 pub fn ensure_system_graph_with_analytics(
302 &self,
303 name: &str,
304 outputs: &[crate::catalog::AnalyticsOutput],
305 ) -> RedDBResult<()> {
306 let store = self.inner.db.store();
307 if store.get_collection(name).is_some() {
308 return Ok(());
309 }
310 let analytics_config = outputs
311 .iter()
312 .map(|output| crate::catalog::AnalyticsViewDescriptor {
313 output: *output,
314 algorithm: None,
315 resolution: None,
316 max_iterations: None,
317 tolerance: None,
318 })
319 .collect();
320 store
321 .create_collection(name)
322 .map_err(|err| RedDBError::Internal(err.to_string()))?;
323 let contract = keyed_collection_contract(
324 name,
325 crate::catalog::CollectionModel::Graph,
326 analytics_config,
327 );
328 self.inner
329 .db
330 .save_collection_contract(contract)
331 .map_err(|err| RedDBError::Internal(err.to_string()))?;
332 self.inner
333 .db
334 .persist_metadata()
335 .map_err(|err| RedDBError::Internal(err.to_string()))?;
336 self.invalidate_result_cache();
337 Ok(())
338 }
339
340 pub fn execute_create_collection(
341 &self,
342 raw_query: &str,
343 query: &CreateCollectionQuery,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 let model = match query.kind.as_str() {
346 "graph" => CollectionModel::Graph,
347 "document" => CollectionModel::Document,
348 "metrics" => CollectionModel::Metrics,
349 "vector.turbo" => {
350 let dimension = query.vector_dimension.ok_or_else(|| {
351 RedDBError::Query(
352 "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353 )
354 })?;
355 let create = CreateVectorQuery {
356 name: query.name.clone(),
357 dimension,
358 metric: query
359 .vector_metric
360 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361 if_not_exists: query.if_not_exists,
362 };
363 let result = self.execute_create_vector(raw_query, &create)?;
364 let store = self.inner.db.store();
371 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372 self.inner
373 .db
374 .persist_metadata()
375 .map_err(|err| RedDBError::Internal(err.to_string()))?;
376 let _ = self.inner.db.turbo_state(&query.name);
381 return Ok(result);
382 }
383 "blockchain" => CollectionModel::Table,
388 other => {
389 return Err(RedDBError::Query(format!(
390 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391 )));
392 }
393 };
394 let create = CreateTableQuery {
395 collection_model: model,
396 name: query.name.clone(),
397 columns: Vec::new(),
398 if_not_exists: query.if_not_exists,
399 default_ttl_ms: None,
400 metrics_rollup_policies: Vec::new(),
401 context_index_fields: Vec::new(),
402 context_index_enabled: false,
403 timestamps: false,
404 partition_by: None,
405 tenant_by: None,
406 append_only: false,
407 subscriptions: Vec::new(),
408 analytics_config: Vec::new(),
409 vault_own_master_key: false,
410 };
411 let result = self.execute_create_table(raw_query, &create)?;
412 if query.kind == "blockchain" {
413 self.install_blockchain_kind(&query.name)?;
414 }
415 if !query.allowed_signers.is_empty() {
420 let actor = crate::runtime::impl_core::current_user_projected()
421 .unwrap_or_else(|| "@system/create-collection".to_string());
422 crate::runtime::signed_writes_kind::install(
423 &self.inner.db.store(),
424 &query.name,
425 &query.allowed_signers,
426 &actor,
427 );
428 }
429 Ok(result)
430 }
431
432 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436 use crate::runtime::blockchain_kind;
437 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438 use std::sync::Arc;
439
440 let store = self.inner.db.store();
441 blockchain_kind::mark_as_chain(&store, name);
442
443 let existing_tip = blockchain_kind::chain_tip(&store, name);
444 if existing_tip.height.is_some() {
445 return Ok(());
446 }
447
448 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450 fields.into_iter().collect();
451 let entity = UnifiedEntity::new(
452 EntityId::new(0),
453 EntityKind::TableRow {
454 table: Arc::from(name),
455 row_id: 0,
456 },
457 EntityData::Row(RowData {
458 columns: Vec::new(),
459 named: Some(named),
460 schema: None,
461 }),
462 );
463 store
464 .insert_auto(name, entity)
465 .map_err(|err| RedDBError::Internal(err.to_string()))?;
466 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469 self.inner
470 .chain_tip_cache
471 .lock()
472 .insert(name.to_string(), tip);
473 }
474 Ok(())
475 }
476
477 pub fn execute_create_vector(
478 &self,
479 raw_query: &str,
480 query: &CreateVectorQuery,
481 ) -> RedDBResult<RuntimeQueryResult> {
482 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486 let store = self.inner.db.store();
487 if store.get_collection(&query.name).is_some() {
488 if query.if_not_exists {
489 return Ok(RuntimeQueryResult::ok_message(
490 raw_query.to_string(),
491 &format!("vector '{}' already exists", query.name),
492 "create",
493 ));
494 }
495 return Err(RedDBError::Query(format!(
496 "vector '{}' already exists",
497 query.name
498 )));
499 }
500
501 store
502 .create_collection(&query.name)
503 .map_err(|err| RedDBError::Internal(err.to_string()))?;
504 self.inner
505 .db
506 .save_collection_contract(vector_collection_contract(query))
507 .map_err(|err| RedDBError::Internal(err.to_string()))?;
508 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509 store.set_config_tree(
510 &format!("red.collection_tenants.{}", query.name),
511 &crate::serde_json::Value::String(tenant_id),
512 );
513 }
514 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523 self.inner
524 .db
525 .persist_metadata()
526 .map_err(|err| RedDBError::Internal(err.to_string()))?;
527 let _ = self.inner.db.turbo_state(&query.name);
532 self.invalidate_result_cache();
533
534 Ok(RuntimeQueryResult::ok_message(
535 raw_query.to_string(),
536 &format!("vector '{}' created", query.name),
537 "create",
538 ))
539 }
540
541 fn provision_vault_key_material(
542 &self,
543 collection: &str,
544 own_master_key: bool,
545 ) -> RedDBResult<()> {
546 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548 })?;
549 if !auth_store.is_vault_backed() {
550 return Err(RedDBError::Query(
551 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552 ));
553 }
554
555 if auth_store.vault_secret_key().is_none() {
556 let key = crate::auth::store::random_bytes(32);
557 auth_store
558 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559 .map_err(|err| RedDBError::Query(err.to_string()))?;
560 }
561
562 if own_master_key {
563 let key = crate::auth::store::random_bytes(32);
564 auth_store
565 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566 .map_err(|err| RedDBError::Query(err.to_string()))?;
567 }
568
569 Ok(())
570 }
571
572 pub fn execute_drop_table(
576 &self,
577 raw_query: &str,
578 query: &DropTableQuery,
579 ) -> RedDBResult<RuntimeQueryResult> {
580 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581 let store = self.inner.db.store();
582
583 if is_system_schema_name(&query.name) {
584 return Err(RedDBError::Query("system schema is read-only".to_string()));
585 }
586
587 let exists = store.get_collection(&query.name).is_some();
588 if !exists {
589 if query.if_exists {
590 return Ok(RuntimeQueryResult::ok_message(
591 raw_query.to_string(),
592 &format!("table '{}' does not exist", query.name),
593 "drop",
594 ));
595 }
596 return Err(RedDBError::NotFound(format!(
597 "table '{}' not found",
598 query.name
599 )));
600 }
601 let actual =
602 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605 let final_count = store
608 .get_collection(&query.name)
609 .map(|manager| manager.query_all(|_| true).len() as u64)
610 .unwrap_or(0);
611 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612 self,
613 &query.name,
614 final_count,
615 )?;
616
617 let orphaned_indices: Vec<String> = self
618 .inner
619 .index_store
620 .list_indices(&query.name)
621 .into_iter()
622 .map(|index| index.name)
623 .collect();
624 for name in &orphaned_indices {
625 self.inner.index_store.drop_index(name, &query.name);
626 }
627
628 store
629 .drop_collection(&query.name)
630 .map_err(|err| RedDBError::Internal(err.to_string()))?;
631 self.inner.db.invalidate_vector_index(&query.name);
632 self.inner.db.clear_collection_default_ttl_ms(&query.name);
633 self.inner
634 .db
635 .remove_collection_contract(&query.name)
636 .map_err(|err| RedDBError::Internal(err.to_string()))?;
637 self.clear_table_planner_stats(&query.name);
638 self.invalidate_result_cache();
639 if let Some(store) = self.inner.auth_store.read().clone() {
643 store.invalidate_visible_collections_cache();
644 }
645 self.inner
646 .db
647 .persist_metadata()
648 .map_err(|err| RedDBError::Internal(err.to_string()))?;
649 self.schema_vocabulary_apply(
655 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656 collection: query.name.clone(),
657 },
658 );
659
660 Ok(RuntimeQueryResult::ok_message(
661 raw_query.to_string(),
662 &format!("table '{}' dropped", query.name),
663 "drop",
664 ))
665 }
666
667 pub fn execute_drop_graph(
668 &self,
669 raw_query: &str,
670 query: &DropGraphQuery,
671 ) -> RedDBResult<RuntimeQueryResult> {
672 self.execute_drop_typed_collection(
673 raw_query,
674 &query.name,
675 query.if_exists,
676 CollectionModel::Graph,
677 "graph",
678 )
679 }
680
681 pub fn execute_drop_vector(
682 &self,
683 raw_query: &str,
684 query: &DropVectorQuery,
685 ) -> RedDBResult<RuntimeQueryResult> {
686 self.execute_drop_typed_collection(
687 raw_query,
688 &query.name,
689 query.if_exists,
690 CollectionModel::Vector,
691 "vector",
692 )
693 }
694
695 pub fn execute_drop_document(
696 &self,
697 raw_query: &str,
698 query: &DropDocumentQuery,
699 ) -> RedDBResult<RuntimeQueryResult> {
700 self.execute_drop_typed_collection(
701 raw_query,
702 &query.name,
703 query.if_exists,
704 CollectionModel::Document,
705 "document",
706 )
707 }
708
709 pub fn execute_drop_kv(
710 &self,
711 raw_query: &str,
712 query: &DropKvQuery,
713 ) -> RedDBResult<RuntimeQueryResult> {
714 let label = polymorphic_resolver::model_name(query.model);
715 self.execute_drop_typed_collection(
716 raw_query,
717 &query.name,
718 query.if_exists,
719 query.model,
720 label,
721 )
722 }
723
724 pub fn execute_drop_collection(
725 &self,
726 raw_query: &str,
727 query: &DropCollectionQuery,
728 ) -> RedDBResult<RuntimeQueryResult> {
729 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730 if is_system_schema_name(&query.name) {
731 return Err(RedDBError::Query("system schema is read-only".to_string()));
732 }
733 let store = self.inner.db.store();
734 if store.get_collection(&query.name).is_none() {
735 if query.if_exists {
736 return Ok(RuntimeQueryResult::ok_message(
737 raw_query.to_string(),
738 &format!("collection '{}' does not exist", query.name),
739 "drop",
740 ));
741 }
742 return Err(RedDBError::NotFound(format!(
743 "collection '{}' not found",
744 query.name
745 )));
746 }
747
748 let actual =
749 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750 if let Some(expected) = query.model {
751 polymorphic_resolver::ensure_model_match(expected, actual)?;
752 }
753
754 match actual {
755 CollectionModel::Table => self.execute_drop_table(
756 raw_query,
757 &DropTableQuery {
758 name: query.name.clone(),
759 if_exists: query.if_exists,
760 },
761 ),
762 CollectionModel::TimeSeries => self.execute_drop_timeseries(
763 raw_query,
764 &DropTimeSeriesQuery {
765 name: query.name.clone(),
766 if_exists: query.if_exists,
767 },
768 ),
769 CollectionModel::Queue => self.execute_drop_queue(
770 raw_query,
771 &DropQueueQuery {
772 name: query.name.clone(),
773 if_exists: query.if_exists,
774 },
775 ),
776 CollectionModel::Graph => self.execute_drop_graph(
777 raw_query,
778 &DropGraphQuery {
779 name: query.name.clone(),
780 if_exists: query.if_exists,
781 },
782 ),
783 CollectionModel::Vector => self.execute_drop_vector(
784 raw_query,
785 &DropVectorQuery {
786 name: query.name.clone(),
787 if_exists: query.if_exists,
788 },
789 ),
790 CollectionModel::Document => self.execute_drop_document(
791 raw_query,
792 &DropDocumentQuery {
793 name: query.name.clone(),
794 if_exists: query.if_exists,
795 },
796 ),
797 CollectionModel::Kv => self.execute_drop_kv(
798 raw_query,
799 &DropKvQuery {
800 name: query.name.clone(),
801 if_exists: query.if_exists,
802 model: CollectionModel::Kv,
803 },
804 ),
805 CollectionModel::Config => self.execute_drop_kv(
806 raw_query,
807 &DropKvQuery {
808 name: query.name.clone(),
809 if_exists: query.if_exists,
810 model: CollectionModel::Config,
811 },
812 ),
813 CollectionModel::Vault => self.execute_drop_kv(
814 raw_query,
815 &DropKvQuery {
816 name: query.name.clone(),
817 if_exists: query.if_exists,
818 model: CollectionModel::Vault,
819 },
820 ),
821 CollectionModel::Hll => self.execute_probabilistic_command(
822 raw_query,
823 &ProbabilisticCommand::DropHll {
824 name: query.name.clone(),
825 if_exists: query.if_exists,
826 },
827 ),
828 CollectionModel::Sketch => self.execute_probabilistic_command(
829 raw_query,
830 &ProbabilisticCommand::DropSketch {
831 name: query.name.clone(),
832 if_exists: query.if_exists,
833 },
834 ),
835 CollectionModel::Filter => self.execute_probabilistic_command(
836 raw_query,
837 &ProbabilisticCommand::DropFilter {
838 name: query.name.clone(),
839 if_exists: query.if_exists,
840 },
841 ),
842 CollectionModel::Metrics => self.execute_drop_typed_collection(
843 raw_query,
844 &query.name,
845 query.if_exists,
846 CollectionModel::Metrics,
847 "metrics",
848 ),
849 CollectionModel::Mixed => self.execute_drop_typed_collection(
850 raw_query,
851 &query.name,
852 query.if_exists,
853 CollectionModel::Mixed,
854 "collection",
855 ),
856 }
857 }
858
859 fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865 let is_graph = self
866 .inner
867 .db
868 .collection_contract(name)
869 .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870 .unwrap_or(false);
871 if !is_graph {
872 return Err(RedDBError::Query(format!(
873 "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874 )));
875 }
876 Ok(())
877 }
878
879 pub fn execute_alter_table(
885 &self,
886 raw_query: &str,
887 query: &AlterTableQuery,
888 ) -> RedDBResult<RuntimeQueryResult> {
889 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890 let store = self.inner.db.store();
891
892 if store.get_collection(&query.name).is_none() {
894 return Err(RedDBError::NotFound(format!(
895 "table '{}' not found",
896 query.name
897 )));
898 }
899
900 let mut messages = Vec::new();
901
902 let fields_added: Vec<String> = query
904 .operations
905 .iter()
906 .filter_map(|op| {
907 if let AlterOperation::AddColumn(col) = op {
908 Some(col.name.clone())
909 } else {
910 None
911 }
912 })
913 .collect();
914 let fields_removed: Vec<String> = query
915 .operations
916 .iter()
917 .filter_map(|op| {
918 if let AlterOperation::DropColumn(name) = op {
919 Some(name.clone())
920 } else {
921 None
922 }
923 })
924 .collect();
925
926 for op in &query.operations {
927 match op {
928 AlterOperation::AddColumn(col) => {
929 messages.push(format!("column '{}' added", col.name));
931 }
932 AlterOperation::DropColumn(name) => {
933 messages.push(format!("column '{}' dropped", name));
934 }
935 AlterOperation::RenameColumn { from, to } => {
936 messages.push(format!("column '{}' renamed to '{}'", from, to));
937 }
938 AlterOperation::AttachPartition { child, bound } => {
939 store.set_config_tree(
943 &format!("partition.{}.children.{}", query.name, child),
944 &crate::serde_json::Value::String(bound.clone()),
945 );
946 messages.push(format!(
947 "partition '{child}' attached to '{}' ({bound})",
948 query.name
949 ));
950 }
951 AlterOperation::DetachPartition { child } => {
952 store.set_config_tree(
953 &format!("partition.{}.children.{}", query.name, child),
954 &crate::serde_json::Value::Null,
955 );
956 messages.push(format!(
957 "partition '{child}' detached from '{}'",
958 query.name
959 ));
960 }
961 AlterOperation::EnableRowLevelSecurity => {
962 self.inner
963 .rls_enabled_tables
964 .write()
965 .insert(query.name.clone());
966 store.set_config_tree(
968 &format!("rls.enabled.{}", query.name),
969 &crate::serde_json::Value::Bool(true),
970 );
971 self.invalidate_plan_cache();
972 messages.push(format!("row level security enabled on '{}'", query.name));
973 }
974 AlterOperation::DisableRowLevelSecurity => {
975 self.inner.rls_enabled_tables.write().remove(&query.name);
976 store.set_config_tree(
977 &format!("rls.enabled.{}", query.name),
978 &crate::serde_json::Value::Null,
979 );
980 self.invalidate_plan_cache();
981 messages.push(format!("row level security disabled on '{}'", query.name));
982 }
983 AlterOperation::EnableTenancy { column } => {
985 store.set_config_tree(
986 &format!("tenant_tables.{}.column", query.name),
987 &crate::serde_json::Value::String(column.clone()),
988 );
989 self.register_tenant_table(&query.name, column);
990 self.invalidate_plan_cache();
991 messages.push(format!(
992 "tenancy enabled on '{}' by column '{column}'",
993 query.name
994 ));
995 }
996 AlterOperation::DisableTenancy => {
997 store.set_config_tree(
998 &format!("tenant_tables.{}.column", query.name),
999 &crate::serde_json::Value::Null,
1000 );
1001 self.unregister_tenant_table(&query.name);
1002 self.invalidate_plan_cache();
1003 messages.push(format!("tenancy disabled on '{}'", query.name));
1004 }
1005 AlterOperation::SetAppendOnly(on) => {
1006 messages.push(format!(
1011 "append_only {} on '{}'",
1012 if *on { "enabled" } else { "disabled" },
1013 query.name
1014 ));
1015 }
1016 AlterOperation::SetVersioned(on) => {
1017 self.vcs_set_versioned(&query.name, *on)?;
1024 messages.push(format!(
1025 "versioned {} on '{}'",
1026 if *on { "enabled" } else { "disabled" },
1027 query.name
1028 ));
1029 }
1030 AlterOperation::EnableEvents(subscription) => {
1031 let mut subscription = subscription.clone();
1032 subscription.source = query.name.clone();
1033 validate_event_subscriptions(
1034 self,
1035 &query.name,
1036 std::slice::from_ref(&subscription),
1037 )?;
1038 ensure_event_target_queue(self, &subscription.target_queue)?;
1039 messages.push(format!(
1040 "events enabled on '{}' to '{}'",
1041 query.name, subscription.target_queue
1042 ));
1043 }
1044 AlterOperation::DisableEvents => {
1045 messages.push(format!("events disabled on '{}'", query.name));
1046 }
1047 AlterOperation::AddSubscription { name, descriptor } => {
1048 let mut sub = descriptor.clone();
1049 sub.name = name.clone();
1050 sub.source = query.name.clone();
1051 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052 ensure_event_target_queue(self, &sub.target_queue)?;
1053 messages.push(format!(
1054 "subscription '{}' added on '{}' to '{}'",
1055 name, query.name, sub.target_queue
1056 ));
1057 }
1058 AlterOperation::DropSubscription { name } => {
1059 messages.push(format!(
1060 "subscription '{}' dropped on '{}'",
1061 name, query.name
1062 ));
1063 }
1064 AlterOperation::AddSigner { pubkey } => {
1065 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072 return Err(RedDBError::Query(format!(
1073 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075 query.name
1076 )));
1077 }
1078 let actor = crate::runtime::impl_core::current_user_projected()
1079 .unwrap_or_else(|| "@system/alter".to_string());
1080 let changed = crate::runtime::signed_writes_kind::add_signer(
1081 &store,
1082 &query.name,
1083 *pubkey,
1084 &actor,
1085 );
1086 messages.push(format!(
1087 "signer {} on '{}'",
1088 if changed { "added" } else { "already present" },
1089 query.name
1090 ));
1091 }
1092 AlterOperation::RevokeSigner { pubkey } => {
1093 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094 return Err(RedDBError::Query(format!(
1095 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096 query.name
1097 )));
1098 }
1099 let actor = crate::runtime::impl_core::current_user_projected()
1100 .unwrap_or_else(|| "@system/alter".to_string());
1101 let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102 &store,
1103 &query.name,
1104 pubkey,
1105 &actor,
1106 );
1107 messages.push(format!(
1108 "signer {} on '{}'",
1109 if changed {
1110 "revoked"
1111 } else {
1112 "already revoked"
1113 },
1114 query.name
1115 ));
1116 }
1117 AlterOperation::SetRetention { duration_ms } => {
1118 let existing = self.inner.db.collection_contract(&query.name);
1124 let has_ts_column = existing
1125 .as_ref()
1126 .map(retention_timestamp_column_exists)
1127 .unwrap_or(false);
1128 if !has_ts_column {
1129 return Err(RedDBError::Query(format!(
1130 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132 or enable WITH timestamps = true before setting a \
1133 retention policy",
1134 query.name
1135 )));
1136 }
1137 messages.push(format!(
1138 "retention set to {duration_ms} ms on '{}'",
1139 query.name
1140 ));
1141 }
1142 AlterOperation::UnsetRetention => {
1143 messages.push(format!("retention cleared on '{}'", query.name));
1144 }
1145 AlterOperation::AddAnalytics(views) => {
1146 self.require_graph_for_analytics(&query.name)?;
1151 let existing = self.inner.db.collection_contract(&query.name);
1152 let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153 existing
1154 .as_ref()
1155 .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156 .unwrap_or_default();
1157 for view in views {
1158 if enabled.contains(&view.output) {
1159 messages.push(format!(
1162 "analytics '{}' already enabled on '{}'",
1163 view.output.as_str(),
1164 query.name
1165 ));
1166 } else {
1167 messages.push(format!(
1168 "analytics '{}' enabled on '{}'",
1169 view.output.as_str(),
1170 query.name
1171 ));
1172 }
1173 }
1174 }
1175 AlterOperation::DropAnalytics(output) => {
1176 self.require_graph_for_analytics(&query.name)?;
1177 let enabled = self
1178 .inner
1179 .db
1180 .collection_contract(&query.name)
1181 .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182 .unwrap_or(false);
1183 if !enabled {
1184 return Err(RedDBError::Query(format!(
1187 "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188 output.as_str(),
1189 query.name
1190 )));
1191 }
1192 messages.push(format!(
1193 "analytics '{}' disabled on '{}'",
1194 output.as_str(),
1195 query.name
1196 ));
1197 }
1198 }
1199 }
1200
1201 let mut contract = self
1202 .inner
1203 .db
1204 .collection_contract(&query.name)
1205 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206 apply_alter_operations_to_contract(&mut contract, &query.operations);
1207 contract.version = contract.version.saturating_add(1);
1208 contract.updated_at_unix_ms = current_unix_ms();
1209 self.inner
1210 .db
1211 .save_collection_contract(contract)
1212 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213 if !fields_added.is_empty() || !fields_removed.is_empty() {
1217 let sub_names: Vec<String> = self
1218 .inner
1219 .db
1220 .collection_contract(&query.name)
1221 .map(|c| {
1222 c.subscriptions
1223 .iter()
1224 .filter(|s| s.enabled)
1225 .map(|s| s.name.clone())
1226 .collect()
1227 })
1228 .unwrap_or_default();
1229 if !sub_names.is_empty() {
1230 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231 collection: query.name.clone(),
1232 subscription_names: sub_names.join(", "),
1233 fields_added: fields_added.join(", "),
1234 fields_removed: fields_removed.join(", "),
1235 lsn: self.cdc_current_lsn(),
1236 }
1237 .emit_global();
1238 }
1239 }
1240
1241 self.clear_table_planner_stats(&query.name);
1242 self.invalidate_result_cache();
1243 let post_alter_columns: Vec<String> = self
1248 .inner
1249 .db
1250 .collection_contract(&query.name)
1251 .map(|contract| {
1252 contract
1253 .declared_columns
1254 .iter()
1255 .map(|col| col.name.clone())
1256 .collect()
1257 })
1258 .unwrap_or_default();
1259 self.schema_vocabulary_apply(
1260 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261 collection: query.name.clone(),
1262 columns: post_alter_columns,
1263 type_tags: Vec::new(),
1264 description: None,
1265 },
1266 );
1267
1268 let message = if messages.is_empty() {
1269 format!("table '{}' altered (no operations)", query.name)
1270 } else {
1271 format!("table '{}' altered: {}", query.name, messages.join(", "))
1272 };
1273
1274 Ok(RuntimeQueryResult::ok_message(
1275 raw_query.to_string(),
1276 &message,
1277 "alter",
1278 ))
1279 }
1280
1281 pub fn execute_explain_alter(
1288 &self,
1289 raw_query: &str,
1290 query: &ExplainAlterQuery,
1291 ) -> RedDBResult<RuntimeQueryResult> {
1292 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297 let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300 .as_ref()
1301 .map(|c| c.declared_columns.clone())
1302 .unwrap_or_default();
1303
1304 let diff = super::schema_diff::compute_column_diff(
1305 &query.target.name,
1306 ¤t_columns,
1307 &query.target.columns,
1308 );
1309
1310 let rendered = match query.format {
1311 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313 };
1314
1315 let format_label = match query.format {
1316 ExplainFormat::Sql => "sql",
1317 ExplainFormat::Json => "json",
1318 };
1319
1320 let columns = vec![
1321 "table".to_string(),
1322 "format".to_string(),
1323 "diff".to_string(),
1324 ];
1325 let row = vec![
1326 ("table".to_string(), Value::text(query.target.name.clone())),
1327 ("format".to_string(), Value::text(format_label.to_string())),
1328 ("diff".to_string(), Value::text(rendered)),
1329 ];
1330
1331 Ok(RuntimeQueryResult::ok_records(
1332 raw_query.to_string(),
1333 columns,
1334 vec![row],
1335 "explain",
1336 ))
1337 }
1338
1339 pub fn execute_create_index(
1344 &self,
1345 raw_query: &str,
1346 query: &CreateIndexQuery,
1347 ) -> RedDBResult<RuntimeQueryResult> {
1348 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349 let store = self.inner.db.store();
1350
1351 let manager = store
1353 .get_collection(&query.table)
1354 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356 let method_kind = match query.method {
1357 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361 };
1362
1363 let entities = manager.query_all(|_| true);
1373 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374 entities
1375 .iter()
1376 .map(|e| {
1377 let fields = match &e.data {
1378 crate::storage::EntityData::Row(row) => {
1379 if let Some(ref named) = row.named {
1380 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381 } else if let Some(ref schema) = row.schema {
1382 schema
1386 .iter()
1387 .zip(row.columns.iter())
1388 .map(|(k, v)| (k.clone(), v.clone()))
1389 .collect()
1390 } else {
1391 Vec::new()
1392 }
1393 }
1394 crate::storage::EntityData::Node(node) => node
1395 .properties
1396 .iter()
1397 .map(|(k, v)| (k.clone(), v.clone()))
1398 .collect(),
1399 _ => Vec::new(),
1400 };
1401 (e.id, fields)
1402 })
1403 .collect();
1404
1405 let indexed_count = self
1407 .inner
1408 .index_store
1409 .create_index(
1410 &query.name,
1411 &query.table,
1412 &query.columns,
1413 method_kind,
1414 query.unique,
1415 &entity_fields,
1416 )
1417 .map_err(RedDBError::Internal)?;
1418
1419 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420 &query.table,
1421 &entity_fields,
1422 );
1423 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424 self.invalidate_plan_cache();
1425
1426 self.inner
1428 .index_store
1429 .register(super::index_store::RegisteredIndex {
1430 name: query.name.clone(),
1431 collection: query.table.clone(),
1432 columns: query.columns.clone(),
1433 method: method_kind,
1434 unique: query.unique,
1435 });
1436 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1440 collection: query.table.clone(),
1441 index: query.name.clone(),
1442 columns: query.columns.clone(),
1443 });
1444
1445 let method_str = format!("{}", query.method);
1446 let unique_str = if query.unique { "unique " } else { "" };
1447 let cols = query.columns.join(", ");
1448
1449 Ok(RuntimeQueryResult::ok_message(
1450 raw_query.to_string(),
1451 &format!(
1452 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1453 unique_str, query.name, query.table, cols, method_str, indexed_count
1454 ),
1455 "create",
1456 ))
1457 }
1458
1459 pub fn execute_drop_index(
1463 &self,
1464 raw_query: &str,
1465 query: &DropIndexQuery,
1466 ) -> RedDBResult<RuntimeQueryResult> {
1467 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1468 let store = self.inner.db.store();
1469
1470 if store.get_collection(&query.table).is_none() {
1472 if query.if_exists {
1473 return Ok(RuntimeQueryResult::ok_message(
1474 raw_query.to_string(),
1475 &format!("table '{}' does not exist", query.table),
1476 "drop",
1477 ));
1478 }
1479 return Err(RedDBError::NotFound(format!(
1480 "table '{}' not found",
1481 query.table
1482 )));
1483 }
1484
1485 self.inner.index_store.drop_index(&query.name, &query.table);
1487 self.invalidate_plan_cache();
1488 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1490 collection: query.table.clone(),
1491 index: query.name.clone(),
1492 });
1493
1494 Ok(RuntimeQueryResult::ok_message(
1495 raw_query.to_string(),
1496 &format!("index '{}' dropped from '{}'", query.name, query.table),
1497 "drop",
1498 ))
1499 }
1500
1501 fn execute_drop_typed_collection(
1502 &self,
1503 raw_query: &str,
1504 name: &str,
1505 if_exists: bool,
1506 expected_model: CollectionModel,
1507 label: &str,
1508 ) -> RedDBResult<RuntimeQueryResult> {
1509 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1510 if is_system_schema_name(name) {
1511 return Err(RedDBError::Query("system schema is read-only".to_string()));
1512 }
1513 let store = self.inner.db.store();
1514 if store.get_collection(name).is_none() {
1515 if if_exists {
1516 return Ok(RuntimeQueryResult::ok_message(
1517 raw_query.to_string(),
1518 &format!("{label} '{name}' does not exist"),
1519 "drop",
1520 ));
1521 }
1522 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1523 }
1524
1525 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1526 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1527 self.drop_collection_storage(raw_query, name, label)
1528 }
1529
1530 pub fn execute_truncate(
1531 &self,
1532 raw_query: &str,
1533 query: &TruncateQuery,
1534 ) -> RedDBResult<RuntimeQueryResult> {
1535 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1536 if is_system_schema_name(&query.name) {
1537 return Err(RedDBError::Query("system schema is read-only".to_string()));
1538 }
1539
1540 let label = query
1541 .model
1542 .map(polymorphic_resolver::model_name)
1543 .unwrap_or("collection");
1544 let store = self.inner.db.store();
1545 if store.get_collection(&query.name).is_none() {
1546 if query.if_exists {
1547 return Ok(RuntimeQueryResult::ok_message(
1548 raw_query.to_string(),
1549 &format!("{label} '{}' does not exist", query.name),
1550 "truncate",
1551 ));
1552 }
1553 return Err(RedDBError::NotFound(format!(
1554 "{label} '{}' not found",
1555 query.name
1556 )));
1557 }
1558
1559 let actual =
1560 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1561 if let Some(expected) = query.model {
1562 polymorphic_resolver::ensure_model_match(expected, actual)?;
1563 }
1564
1565 if actual == CollectionModel::Queue {
1566 return self.execute_queue_command(
1567 raw_query,
1568 &QueueCommand::Purge {
1569 queue: query.name.clone(),
1570 },
1571 );
1572 }
1573
1574 let affected = self.truncate_collection_entities(&query.name)?;
1576 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1578 self.inner.db.invalidate_vector_index(&query.name);
1579 self.clear_table_planner_stats(&query.name);
1580 self.invalidate_result_cache();
1581
1582 Ok(RuntimeQueryResult::ok_message(
1583 raw_query.to_string(),
1584 &format!(
1585 "{affected} entities truncated from {label} '{}'",
1586 query.name
1587 ),
1588 "truncate",
1589 ))
1590 }
1591
1592 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1593 let store = self.inner.db.store();
1594 let Some(manager) = store.get_collection(name) else {
1595 return Ok(0);
1596 };
1597 let entities = manager.query_all(|_| true);
1598 if entities.is_empty() {
1599 return Ok(0);
1600 }
1601
1602 for entity in &entities {
1603 let fields = entity_index_fields(&entity.data);
1604 self.inner
1605 .index_store
1606 .index_entity_delete(name, entity.id, &fields)
1607 .map_err(RedDBError::Internal)?;
1608 }
1609
1610 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1611 let deleted_ids = store
1612 .delete_batch(name, &ids)
1613 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1614 for id in &deleted_ids {
1615 store.context_index().remove_entity(*id);
1616 }
1617 Ok(deleted_ids.len() as u64)
1618 }
1619
1620 fn drop_collection_storage(
1621 &self,
1622 raw_query: &str,
1623 name: &str,
1624 label: &str,
1625 ) -> RedDBResult<RuntimeQueryResult> {
1626 let store = self.inner.db.store();
1627
1628 let final_count = store
1631 .get_collection(name)
1632 .map(|manager| manager.query_all(|_| true).len() as u64)
1633 .unwrap_or(0);
1634 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1635 self,
1636 name,
1637 final_count,
1638 )?;
1639
1640 let orphaned_indices: Vec<String> = self
1641 .inner
1642 .index_store
1643 .list_indices(name)
1644 .into_iter()
1645 .map(|index| index.name)
1646 .collect();
1647 for index_name in &orphaned_indices {
1648 self.inner.index_store.drop_index(index_name, name);
1649 }
1650
1651 store
1652 .drop_collection(name)
1653 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1654 self.inner.db.invalidate_vector_index(name);
1655 self.inner.db.clear_collection_default_ttl_ms(name);
1656 self.inner
1657 .db
1658 .remove_collection_contract(name)
1659 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1660 self.clear_table_planner_stats(name);
1661 self.invalidate_result_cache();
1662 if let Some(store) = self.inner.auth_store.read().clone() {
1663 store.invalidate_visible_collections_cache();
1664 }
1665 self.inner
1666 .db
1667 .persist_metadata()
1668 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1669 self.schema_vocabulary_apply(
1670 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1671 collection: name.to_string(),
1672 },
1673 );
1674
1675 Ok(RuntimeQueryResult::ok_message(
1676 raw_query.to_string(),
1677 &format!("{label} '{name}' dropped"),
1678 "drop",
1679 ))
1680 }
1681}
1682
1683pub(crate) fn is_system_schema_name(name: &str) -> bool {
1684 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1685}
1686
1687fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1688 match data {
1689 EntityData::Row(row) => {
1690 if let Some(ref named) = row.named {
1691 named
1692 .iter()
1693 .map(|(key, value)| (key.clone(), value.clone()))
1694 .collect()
1695 } else if let Some(ref schema) = row.schema {
1696 schema
1697 .iter()
1698 .zip(row.columns.iter())
1699 .map(|(key, value)| (key.clone(), value.clone()))
1700 .collect()
1701 } else {
1702 Vec::new()
1703 }
1704 }
1705 EntityData::Node(node) => node
1706 .properties
1707 .iter()
1708 .map(|(key, value)| (key.clone(), value.clone()))
1709 .collect(),
1710 _ => Vec::new(),
1711 }
1712}
1713
1714fn collection_contract_from_create_table(
1715 query: &CreateTableQuery,
1716) -> RedDBResult<crate::physical::CollectionContract> {
1717 let now = current_unix_ms();
1718 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1719 .columns
1720 .iter()
1721 .map(declared_column_contract_from_ddl)
1722 .collect();
1723 if query.timestamps {
1724 declared_columns.push(crate::physical::DeclaredColumnContract {
1728 name: "created_at".to_string(),
1729 data_type: "BIGINT".to_string(),
1730 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1731 not_null: true,
1732 default: None,
1733 compress: None,
1734 unique: false,
1735 primary_key: false,
1736 enum_variants: Vec::new(),
1737 array_element: None,
1738 decimal_precision: None,
1739 });
1740 declared_columns.push(crate::physical::DeclaredColumnContract {
1741 name: "updated_at".to_string(),
1742 data_type: "BIGINT".to_string(),
1743 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1744 not_null: true,
1745 default: None,
1746 compress: None,
1747 unique: false,
1748 primary_key: false,
1749 enum_variants: Vec::new(),
1750 array_element: None,
1751 decimal_precision: None,
1752 });
1753 }
1754 Ok(crate::physical::CollectionContract {
1755 name: query.name.clone(),
1756 declared_model: crate::catalog::CollectionModel::Table,
1757 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1758 origin: crate::physical::ContractOrigin::Explicit,
1759 version: 1,
1760 created_at_unix_ms: now,
1761 updated_at_unix_ms: now,
1762 default_ttl_ms: query.default_ttl_ms,
1763 vector_dimension: None,
1764 vector_metric: None,
1765 context_index_fields: query.context_index_fields.clone(),
1766 declared_columns,
1767 table_def: Some(build_table_def_from_create_table(query)?),
1768 timestamps_enabled: query.timestamps,
1769 context_index_enabled: query.context_index_enabled
1770 || !query.context_index_fields.is_empty(),
1771 metrics_raw_retention_ms: None,
1772 metrics_rollup_policies: Vec::new(),
1773 metrics_tenant_identity: None,
1774 metrics_namespace: None,
1775 append_only: query.append_only,
1776 subscriptions: query.subscriptions.clone(),
1777 analytics_config: Vec::new(),
1778 session_key: None,
1779 session_gap_ms: None,
1780 retention_duration_ms: None,
1781 })
1782}
1783
1784fn default_collection_contract_for_existing_table(
1785 name: &str,
1786) -> crate::physical::CollectionContract {
1787 let now = current_unix_ms();
1788 crate::physical::CollectionContract {
1789 name: name.to_string(),
1790 declared_model: crate::catalog::CollectionModel::Table,
1791 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1792 origin: crate::physical::ContractOrigin::Explicit,
1793 version: 0,
1794 created_at_unix_ms: now,
1795 updated_at_unix_ms: now,
1796 default_ttl_ms: None,
1797 vector_dimension: None,
1798 vector_metric: None,
1799 context_index_fields: Vec::new(),
1800 declared_columns: Vec::new(),
1801 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1802 timestamps_enabled: false,
1803 context_index_enabled: false,
1804 metrics_raw_retention_ms: None,
1805 metrics_rollup_policies: Vec::new(),
1806 metrics_tenant_identity: None,
1807 metrics_namespace: None,
1808 append_only: false,
1809 subscriptions: Vec::new(),
1810 analytics_config: Vec::new(),
1811 session_key: None,
1812 session_gap_ms: None,
1813 retention_duration_ms: None,
1814 }
1815}
1816
1817fn keyed_collection_contract(
1818 name: &str,
1819 model: crate::catalog::CollectionModel,
1820 analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1821) -> crate::physical::CollectionContract {
1822 let now = current_unix_ms();
1823 crate::physical::CollectionContract {
1824 name: name.to_string(),
1825 declared_model: model,
1826 schema_mode: crate::catalog::SchemaMode::Dynamic,
1827 origin: crate::physical::ContractOrigin::Explicit,
1828 version: 1,
1829 created_at_unix_ms: now,
1830 updated_at_unix_ms: now,
1831 default_ttl_ms: None,
1832 vector_dimension: None,
1833 vector_metric: None,
1834 context_index_fields: Vec::new(),
1835 declared_columns: Vec::new(),
1836 table_def: None,
1837 timestamps_enabled: false,
1838 context_index_enabled: false,
1839 metrics_raw_retention_ms: None,
1840 metrics_rollup_policies: Vec::new(),
1841 metrics_tenant_identity: None,
1842 metrics_namespace: None,
1843 append_only: false,
1844 subscriptions: Vec::new(),
1845 analytics_config,
1846 session_key: None,
1847 session_gap_ms: None,
1848 retention_duration_ms: None,
1849 }
1850}
1851
1852fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1853 let now = current_unix_ms();
1854 crate::physical::CollectionContract {
1855 name: query.name.clone(),
1856 declared_model: crate::catalog::CollectionModel::Metrics,
1857 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1858 origin: crate::physical::ContractOrigin::Explicit,
1859 version: 1,
1860 created_at_unix_ms: now,
1861 updated_at_unix_ms: now,
1862 default_ttl_ms: query.default_ttl_ms,
1863 vector_dimension: None,
1864 vector_metric: None,
1865 context_index_fields: Vec::new(),
1866 declared_columns: Vec::new(),
1867 table_def: None,
1868 timestamps_enabled: false,
1869 context_index_enabled: false,
1870 metrics_raw_retention_ms: query.default_ttl_ms,
1871 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1872 metrics_tenant_identity: Some(
1873 query
1874 .tenant_by
1875 .clone()
1876 .unwrap_or_else(|| "current_tenant".to_string()),
1877 ),
1878 metrics_namespace: Some("default".to_string()),
1879 append_only: true,
1880 subscriptions: Vec::new(),
1881 analytics_config: Vec::new(),
1882 session_key: None,
1883 session_gap_ms: None,
1884 retention_duration_ms: None,
1885 }
1886}
1887
1888fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1889 let now = current_unix_ms();
1890 crate::physical::CollectionContract {
1891 name: query.name.clone(),
1892 declared_model: crate::catalog::CollectionModel::Vector,
1893 schema_mode: crate::catalog::SchemaMode::Dynamic,
1894 origin: crate::physical::ContractOrigin::Explicit,
1895 version: 1,
1896 created_at_unix_ms: now,
1897 updated_at_unix_ms: now,
1898 default_ttl_ms: None,
1899 vector_dimension: Some(query.dimension),
1900 vector_metric: Some(query.metric),
1901 context_index_fields: Vec::new(),
1902 declared_columns: Vec::new(),
1903 table_def: None,
1904 timestamps_enabled: false,
1905 context_index_enabled: false,
1906 metrics_raw_retention_ms: None,
1907 metrics_rollup_policies: Vec::new(),
1908 metrics_tenant_identity: None,
1909 metrics_namespace: None,
1910 append_only: false,
1911 subscriptions: Vec::new(),
1912 analytics_config: Vec::new(),
1913 session_key: None,
1914 session_gap_ms: None,
1915 retention_duration_ms: None,
1916 }
1917}
1918
1919fn declared_column_contract_from_ddl(
1920 column: &CreateColumnDef,
1921) -> crate::physical::DeclaredColumnContract {
1922 crate::physical::DeclaredColumnContract {
1923 name: column.name.clone(),
1924 data_type: column.data_type.clone(),
1925 sql_type: Some(column.sql_type.clone()),
1926 not_null: column.not_null,
1927 default: column.default.clone(),
1928 compress: column.compress,
1929 unique: column.unique,
1930 primary_key: column.primary_key,
1931 enum_variants: column.enum_variants.clone(),
1932 array_element: column.array_element.clone(),
1933 decimal_precision: column.decimal_precision,
1934 }
1935}
1936
1937fn apply_alter_operations_to_contract(
1938 contract: &mut crate::physical::CollectionContract,
1939 operations: &[AlterOperation],
1940) {
1941 if contract.table_def.is_none() {
1942 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1943 }
1944 for operation in operations {
1945 match operation {
1946 AlterOperation::AddColumn(column) => {
1947 if !contract
1948 .declared_columns
1949 .iter()
1950 .any(|existing| existing.name == column.name)
1951 {
1952 contract
1953 .declared_columns
1954 .push(declared_column_contract_from_ddl(column));
1955 }
1956 if let Some(table_def) = contract.table_def.as_mut() {
1957 if table_def.get_column(&column.name).is_none() {
1958 if let Ok(column_def) = column_def_from_ddl(column) {
1959 if column.primary_key {
1960 table_def.primary_key.push(column.name.clone());
1961 table_def.constraints.push(
1962 crate::storage::schema::Constraint::new(
1963 format!("pk_{}", column.name),
1964 crate::storage::schema::ConstraintType::PrimaryKey,
1965 )
1966 .on_columns(vec![column.name.clone()]),
1967 );
1968 }
1969 if column.unique {
1970 table_def.constraints.push(
1971 crate::storage::schema::Constraint::new(
1972 format!("uniq_{}", column.name),
1973 crate::storage::schema::ConstraintType::Unique,
1974 )
1975 .on_columns(vec![column.name.clone()]),
1976 );
1977 }
1978 if column.not_null {
1979 table_def.constraints.push(
1980 crate::storage::schema::Constraint::new(
1981 format!("not_null_{}", column.name),
1982 crate::storage::schema::ConstraintType::NotNull,
1983 )
1984 .on_columns(vec![column.name.clone()]),
1985 );
1986 }
1987 table_def.columns.push(column_def);
1988 }
1989 }
1990 }
1991 }
1992 AlterOperation::DropColumn(name) => {
1993 contract
1994 .declared_columns
1995 .retain(|column| column.name != *name);
1996 if let Some(table_def) = contract.table_def.as_mut() {
1997 if let Some(index) = table_def.column_index(name) {
1998 table_def.columns.remove(index);
1999 }
2000 table_def.primary_key.retain(|column| column != name);
2001 table_def.constraints.retain(|constraint| {
2002 !constraint.columns.iter().any(|column| column == name)
2003 });
2004 table_def
2005 .indexes
2006 .retain(|index| !index.columns.iter().any(|column| column == name));
2007 }
2008 }
2009 AlterOperation::RenameColumn { from, to } => {
2010 if contract
2011 .declared_columns
2012 .iter()
2013 .any(|column| column.name == *to)
2014 {
2015 continue;
2016 }
2017 if let Some(column) = contract
2018 .declared_columns
2019 .iter_mut()
2020 .find(|column| column.name == *from)
2021 {
2022 column.name = to.clone();
2023 }
2024 if let Some(table_def) = contract.table_def.as_mut() {
2025 if let Some(column) = table_def
2026 .columns
2027 .iter_mut()
2028 .find(|column| column.name == *from)
2029 {
2030 column.name = to.clone();
2031 }
2032 for primary_key in &mut table_def.primary_key {
2033 if *primary_key == *from {
2034 *primary_key = to.clone();
2035 }
2036 }
2037 for constraint in &mut table_def.constraints {
2038 for column in &mut constraint.columns {
2039 if *column == *from {
2040 *column = to.clone();
2041 }
2042 }
2043 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2044 for column in ref_columns {
2045 if *column == *from {
2046 *column = to.clone();
2047 }
2048 }
2049 }
2050 }
2051 for index in &mut table_def.indexes {
2052 for column in &mut index.columns {
2053 if *column == *from {
2054 *column = to.clone();
2055 }
2056 }
2057 }
2058 }
2059 }
2060 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2063 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2067 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2070 AlterOperation::SetAppendOnly(on) => {
2071 contract.append_only = *on;
2072 }
2073 AlterOperation::SetVersioned(_) => {}
2076 AlterOperation::EnableEvents(subscription) => {
2077 let mut subscription = subscription.clone();
2078 subscription.source = contract.name.clone();
2079 subscription.enabled = true;
2080 if let Some(existing) = contract
2081 .subscriptions
2082 .iter_mut()
2083 .find(|existing| existing.target_queue == subscription.target_queue)
2084 {
2085 *existing = subscription;
2086 } else {
2087 contract.subscriptions.push(subscription);
2088 }
2089 }
2090 AlterOperation::DisableEvents => {
2091 for subscription in &mut contract.subscriptions {
2092 subscription.enabled = false;
2093 }
2094 }
2095 AlterOperation::AddSubscription { name, descriptor } => {
2096 let mut sub = descriptor.clone();
2097 sub.name = name.clone();
2098 sub.source = contract.name.clone();
2099 sub.enabled = true;
2100 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2101 {
2102 *existing = sub;
2103 } else {
2104 contract.subscriptions.push(sub);
2105 }
2106 }
2107 AlterOperation::DropSubscription { name } => {
2108 contract.subscriptions.retain(|s| s.name != *name);
2109 }
2110 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2115 AlterOperation::SetRetention { duration_ms } => {
2116 contract.retention_duration_ms = Some(*duration_ms);
2117 }
2118 AlterOperation::UnsetRetention => {
2119 contract.retention_duration_ms = None;
2120 }
2121 AlterOperation::AddAnalytics(views) => {
2125 for view in views {
2126 if !contract
2130 .analytics_config
2131 .iter()
2132 .any(|existing| existing.output == view.output)
2133 {
2134 contract.analytics_config.push(view.clone());
2135 }
2136 }
2137 }
2138 AlterOperation::DropAnalytics(output) => {
2139 contract
2140 .analytics_config
2141 .retain(|view| view.output != *output);
2142 }
2143 }
2144 }
2145}
2146
2147pub(crate) fn retention_timestamp_column_exists(
2152 contract: &crate::physical::CollectionContract,
2153) -> bool {
2154 if contract.timestamps_enabled {
2155 return true;
2156 }
2157 if matches!(
2158 contract.declared_model,
2159 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2160 ) {
2161 return true;
2165 }
2166 contract
2167 .declared_columns
2168 .iter()
2169 .any(|column| is_temporal_data_type(&column.data_type))
2170}
2171
2172fn is_temporal_data_type(data_type: &str) -> bool {
2173 let upper = data_type.to_ascii_uppercase();
2174 matches!(
2175 upper.as_str(),
2176 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2177 )
2178}
2179
2180fn validate_event_subscriptions(
2181 runtime: &RedDBRuntime,
2182 source: &str,
2183 subscriptions: &[crate::catalog::SubscriptionDescriptor],
2184) -> RedDBResult<()> {
2185 for subscription in subscriptions
2186 .iter()
2187 .filter(|subscription| subscription.enabled)
2188 {
2189 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2190 return Err(RedDBError::Query(
2191 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2192 ));
2193 }
2194 validate_subscription_auth(runtime, source, subscription)?;
2195 if subscription.target_queue == source
2196 || subscription_would_create_cycle(
2197 &runtime.inner.db,
2198 source,
2199 &subscription.target_queue,
2200 )
2201 {
2202 return Err(RedDBError::Query(
2203 "subscription would create cycle".to_string(),
2204 ));
2205 }
2206 audit_subscription_redact_gap(runtime, source, subscription);
2207 }
2208 Ok(())
2209}
2210
2211fn validate_subscription_auth(
2212 runtime: &RedDBRuntime,
2213 source: &str,
2214 subscription: &crate::catalog::SubscriptionDescriptor,
2215) -> RedDBResult<()> {
2216 let auth_store = match runtime.inner.auth_store.read().clone() {
2217 Some(store) => store,
2218 None => return Ok(()),
2219 };
2220 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2221 Some(identity) => identity,
2222 None => return Ok(()),
2223 };
2224 let tenant = crate::runtime::impl_core::current_tenant();
2225 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2226
2227 if auth_store.iam_authorization_enabled() {
2228 let ctx = crate::auth::policies::EvalContext {
2229 principal_tenant: tenant.clone(),
2230 current_tenant: tenant.clone(),
2231 peer_ip: None,
2232 mfa_present: false,
2233 now_ms: crate::auth::now_ms(),
2234 principal_is_admin_role: role == crate::auth::Role::Admin,
2235 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2236 principal_is_platform_scoped: principal.tenant.is_none(),
2237 };
2238 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2239 if let Some(t) = tenant.as_deref() {
2240 source_resource = source_resource.with_tenant(t.to_string());
2241 }
2242 if !auth_store.check_policy_authz_with_role(
2243 &principal,
2244 "select",
2245 &source_resource,
2246 &ctx,
2247 role,
2248 ) {
2249 return Err(RedDBError::Query(format!(
2250 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2251 principal, source_resource.kind, source_resource.name
2252 )));
2253 }
2254
2255 let mut target_resource =
2256 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2257 if let Some(t) = tenant.as_deref() {
2258 target_resource = target_resource.with_tenant(t.to_string());
2259 }
2260 if !auth_store.check_policy_authz_with_role(
2261 &principal,
2262 "write",
2263 &target_resource,
2264 &ctx,
2265 role,
2266 ) {
2267 return Err(RedDBError::Query(format!(
2268 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2269 principal, target_resource.kind, target_resource.name
2270 )));
2271 }
2272 return Ok(());
2273 }
2274
2275 let ctx = crate::auth::privileges::AuthzContext {
2276 principal: &username,
2277 effective_role: role,
2278 tenant: tenant.as_deref(),
2279 };
2280 auth_store
2281 .check_grant(
2282 &ctx,
2283 crate::auth::privileges::Action::Select,
2284 &crate::auth::privileges::Resource::table_from_name(source),
2285 )
2286 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2287 auth_store
2288 .check_grant(
2289 &ctx,
2290 crate::auth::privileges::Action::Insert,
2291 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2292 )
2293 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2294 Ok(())
2295}
2296
2297fn audit_subscription_redact_gap(
2298 runtime: &RedDBRuntime,
2299 source: &str,
2300 subscription: &crate::catalog::SubscriptionDescriptor,
2301) {
2302 let auth_store = match runtime.inner.auth_store.read().clone() {
2303 Some(store) if store.iam_authorization_enabled() => store,
2304 _ => return,
2305 };
2306 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2307 Some(identity) => identity,
2308 None => return,
2309 };
2310 let tenant = crate::runtime::impl_core::current_tenant();
2311 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2312 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2313 if missing.is_empty() {
2314 return;
2315 }
2316
2317 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2318 tracing::warn!(
2319 target: "reddb::operator",
2320 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2321 source,
2322 subscription.target_queue,
2323 columns
2324 );
2325 let mut event = AuditEvent::builder("subscription_redact_gap")
2326 .principal(username)
2327 .source(AuditAuthSource::System)
2328 .resource(format!(
2329 "subscription:{}->{}",
2330 source, subscription.target_queue
2331 ))
2332 .outcome(Outcome::Success)
2333 .field(AuditFieldEscaper::field("source", source))
2334 .field(AuditFieldEscaper::field(
2335 "target_queue",
2336 subscription.target_queue.clone(),
2337 ))
2338 .field(AuditFieldEscaper::field(
2339 "subscription",
2340 subscription.name.clone(),
2341 ))
2342 .field(AuditFieldEscaper::field("columns", columns))
2343 .field(AuditFieldEscaper::field("role", role.as_str()));
2344 if let Some(t) = tenant {
2345 event = event.tenant(t);
2346 }
2347 runtime.inner.audit_log.record_event(event.build());
2348}
2349
2350fn subscription_redact_gap_columns(
2351 auth_store: &crate::auth::store::AuthStore,
2352 principal: &crate::auth::UserId,
2353 source: &str,
2354 subscription: &crate::catalog::SubscriptionDescriptor,
2355) -> BTreeSet<String> {
2356 let redacted: HashSet<String> = subscription
2357 .redact_fields
2358 .iter()
2359 .map(|field| field.to_ascii_lowercase())
2360 .collect();
2361 auth_store
2362 .effective_policies(principal)
2363 .iter()
2364 .flat_map(|policy| policy.statements.iter())
2365 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2366 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2367 .flat_map(|statement| statement.resources.iter())
2368 .filter_map(|resource| denied_column_for_source(resource, source))
2369 .filter(|column| !redact_covers_column(&redacted, source, column))
2370 .collect()
2371}
2372
2373fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2374 match pattern {
2375 crate::auth::policies::ActionPattern::Wildcard => true,
2376 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2377 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2378 "select".len() > prefix.len() + 1
2379 && "select".starts_with(prefix)
2380 && "select".as_bytes()[prefix.len()] == b':'
2381 }
2382 }
2383}
2384
2385fn denied_column_for_source(
2386 resource: &crate::auth::policies::ResourcePattern,
2387 source: &str,
2388) -> Option<String> {
2389 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2390 return None;
2391 };
2392 if kind != "column" {
2393 return None;
2394 }
2395 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2396 (column.table_resource_name() == source).then_some(column.column)
2397}
2398
2399fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2400 let column = column.to_ascii_lowercase();
2401 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2402 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2403}
2404
2405fn subscription_would_create_cycle(
2406 db: &crate::storage::unified::devx::RedDB,
2407 source: &str,
2408 target: &str,
2409) -> bool {
2410 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2411 for contract in db.collection_contracts() {
2412 for subscription in contract
2413 .subscriptions
2414 .into_iter()
2415 .filter(|subscription| subscription.enabled)
2416 {
2417 graph
2418 .entry(subscription.source)
2419 .or_default()
2420 .push(subscription.target_queue);
2421 }
2422 }
2423 graph
2424 .entry(source.to_string())
2425 .or_default()
2426 .push(target.to_string());
2427
2428 let mut stack = vec![target.to_string()];
2429 let mut seen = HashSet::new();
2430 while let Some(node) = stack.pop() {
2431 if node == source {
2432 return true;
2433 }
2434 if !seen.insert(node.clone()) {
2435 continue;
2436 }
2437 if let Some(next) = graph.get(&node) {
2438 stack.extend(next.iter().cloned());
2439 }
2440 }
2441 false
2442}
2443
2444pub(crate) fn ensure_event_target_queue_pub(
2445 runtime: &RedDBRuntime,
2446 queue: &str,
2447) -> RedDBResult<()> {
2448 ensure_event_target_queue(runtime, queue)
2449}
2450
2451fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2452 let store = runtime.inner.db.store();
2453 if store.get_collection(queue).is_some() {
2454 return Ok(());
2455 }
2456 store
2457 .create_collection(queue)
2458 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2459 runtime
2460 .inner
2461 .db
2462 .save_collection_contract(event_queue_collection_contract(queue))
2463 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2464 store.set_config_tree(
2465 &format!("queue.{queue}.mode"),
2466 &crate::serde_json::Value::String("fanout".to_string()),
2467 );
2468 Ok(())
2469}
2470
2471fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2472 let now = current_unix_ms();
2473 crate::physical::CollectionContract {
2474 name: queue.to_string(),
2475 declared_model: crate::catalog::CollectionModel::Queue,
2476 schema_mode: crate::catalog::SchemaMode::Dynamic,
2477 origin: crate::physical::ContractOrigin::Implicit,
2478 version: 1,
2479 created_at_unix_ms: now,
2480 updated_at_unix_ms: now,
2481 default_ttl_ms: None,
2482 vector_dimension: None,
2483 vector_metric: None,
2484 context_index_fields: Vec::new(),
2485 declared_columns: Vec::new(),
2486 table_def: None,
2487 timestamps_enabled: false,
2488 context_index_enabled: false,
2489 metrics_raw_retention_ms: None,
2490 metrics_rollup_policies: Vec::new(),
2491 metrics_tenant_identity: None,
2492 metrics_namespace: None,
2493 append_only: true,
2494 subscriptions: Vec::new(),
2495 analytics_config: Vec::new(),
2496 session_key: None,
2497 session_gap_ms: None,
2498 retention_duration_ms: None,
2499 }
2500}
2501
2502fn build_table_def_from_create_table(
2503 query: &CreateTableQuery,
2504) -> RedDBResult<crate::storage::schema::TableDef> {
2505 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2506 for column in &query.columns {
2507 if column.primary_key {
2508 table.primary_key.push(column.name.clone());
2509 table.constraints.push(
2510 crate::storage::schema::Constraint::new(
2511 format!("pk_{}", column.name),
2512 crate::storage::schema::ConstraintType::PrimaryKey,
2513 )
2514 .on_columns(vec![column.name.clone()]),
2515 );
2516 }
2517 if column.unique {
2518 table.constraints.push(
2519 crate::storage::schema::Constraint::new(
2520 format!("uniq_{}", column.name),
2521 crate::storage::schema::ConstraintType::Unique,
2522 )
2523 .on_columns(vec![column.name.clone()]),
2524 );
2525 }
2526 if column.not_null {
2527 table.constraints.push(
2528 crate::storage::schema::Constraint::new(
2529 format!("not_null_{}", column.name),
2530 crate::storage::schema::ConstraintType::NotNull,
2531 )
2532 .on_columns(vec![column.name.clone()]),
2533 );
2534 }
2535 table.columns.push(column_def_from_ddl(column)?);
2536 }
2537 if query.timestamps {
2542 table.columns.push(
2543 crate::storage::schema::ColumnDef::new(
2544 "created_at".to_string(),
2545 crate::storage::schema::DataType::UnsignedInteger,
2546 )
2547 .not_null(),
2548 );
2549 table.columns.push(
2550 crate::storage::schema::ColumnDef::new(
2551 "updated_at".to_string(),
2552 crate::storage::schema::DataType::UnsignedInteger,
2553 )
2554 .not_null(),
2555 );
2556 table.constraints.push(
2557 crate::storage::schema::Constraint::new(
2558 "not_null_created_at".to_string(),
2559 crate::storage::schema::ConstraintType::NotNull,
2560 )
2561 .on_columns(vec!["created_at".to_string()]),
2562 );
2563 table.constraints.push(
2564 crate::storage::schema::Constraint::new(
2565 "not_null_updated_at".to_string(),
2566 crate::storage::schema::ConstraintType::NotNull,
2567 )
2568 .on_columns(vec!["updated_at".to_string()]),
2569 );
2570 }
2571 table
2572 .validate()
2573 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2574 Ok(table)
2575}
2576
2577fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2578 let data_type = resolve_declared_data_type(&column.data_type)
2579 .map_err(|err| RedDBError::Query(err.to_string()))?;
2580 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2581 if column.not_null {
2582 column_def = column_def.not_null();
2583 }
2584 if let Some(default) = &column.default {
2585 column_def = column_def.with_default(default.as_bytes().to_vec());
2586 }
2587 if column.compress.unwrap_or(0) > 0 {
2588 column_def = column_def.compressed();
2589 }
2590 if !column.enum_variants.is_empty() {
2591 column_def = column_def.with_variants(column.enum_variants.clone());
2592 }
2593 if let Some(precision) = column.decimal_precision {
2594 column_def = column_def.with_precision(precision);
2595 }
2596 if let Some(element_type) = &column.array_element {
2597 column_def = column_def.with_element_type(
2598 resolve_declared_data_type(element_type)
2599 .map_err(|err| RedDBError::Query(err.to_string()))?,
2600 );
2601 }
2602 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2603 if column.unique {
2604 column_def = column_def.with_metadata("unique", "true");
2605 }
2606 if column.primary_key {
2607 column_def = column_def.with_metadata("primary_key", "true");
2608 }
2609 Ok(column_def)
2610}
2611
2612fn current_unix_ms() -> u128 {
2613 std::time::SystemTime::now()
2614 .duration_since(std::time::UNIX_EPOCH)
2615 .unwrap_or_default()
2616 .as_millis()
2617}
2618
2619#[cfg(test)]
2620mod tests {
2621 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2622 use crate::auth::store::{AuthStore, PrincipalRef};
2623 use crate::auth::UserId;
2624 use crate::auth::{AuthConfig, Role};
2625 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2626 use crate::storage::schema::Value;
2627 use crate::{RedDBOptions, RedDBRuntime};
2628 use std::sync::Arc;
2629
2630 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2631 Policy {
2632 id: id.to_string(),
2633 version: 1,
2634 tenant: None,
2635 created_at: 0,
2636 updated_at: 0,
2637 statements: vec![Statement {
2638 sid: None,
2639 effect: Effect::Allow,
2640 actions: vec![ActionPattern::Exact(action.to_string())],
2641 resources: vec![ResourcePattern::Exact {
2642 kind: "collection".to_string(),
2643 name: collection.to_string(),
2644 }],
2645 condition: None,
2646 }],
2647 }
2648 }
2649
2650 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2651 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2652 *rt.inner.auth_store.write() = Some(store.clone());
2653 store
2654 }
2655
2656 #[test]
2657 fn drop_denied_without_iam_policy() {
2658 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2659 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2660 let store = wire_auth_store(&rt);
2661 let select_only = Policy {
2663 id: "select-only".to_string(),
2664 version: 1,
2665 tenant: None,
2666 created_at: 0,
2667 updated_at: 0,
2668 statements: vec![Statement {
2669 sid: None,
2670 effect: Effect::Allow,
2671 actions: vec![ActionPattern::Exact("select".to_string())],
2672 resources: vec![ResourcePattern::Wildcard],
2673 condition: None,
2674 }],
2675 };
2676 store.put_policy_internal(select_only).unwrap();
2677 let alice = UserId::from_parts(None, "alice");
2678 store
2679 .attach_policy(PrincipalRef::User(alice), "select-only")
2680 .unwrap();
2681 set_current_auth_identity("alice".to_string(), Role::Write);
2682 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2683 clear_current_auth_identity();
2684 assert!(
2685 format!("{err}").contains("denied by IAM policy"),
2686 "got: {err}"
2687 );
2688 }
2689
2690 #[test]
2691 fn drop_allowed_with_explicit_iam_policy() {
2692 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2693 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2694 let store = wire_auth_store(&rt);
2695 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2696 store.put_policy_internal(policy).unwrap();
2697 let bob = UserId::from_parts(None, "bob");
2698 store
2699 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2700 .unwrap();
2701 set_current_auth_identity("bob".to_string(), Role::Write);
2702 rt.execute_query("DROP TABLE bar").unwrap();
2703 clear_current_auth_identity();
2704 }
2705
2706 #[test]
2707 fn drop_allowed_with_wildcard_iam_policy() {
2708 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2709 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2710 let store = wire_auth_store(&rt);
2711 let policy = Policy {
2712 id: "allow-drop-all".to_string(),
2713 version: 1,
2714 tenant: None,
2715 created_at: 0,
2716 updated_at: 0,
2717 statements: vec![Statement {
2718 sid: None,
2719 effect: Effect::Allow,
2720 actions: vec![ActionPattern::Exact("drop".to_string())],
2721 resources: vec![ResourcePattern::Wildcard],
2722 condition: None,
2723 }],
2724 };
2725 store.put_policy_internal(policy).unwrap();
2726 let carl = UserId::from_parts(None, "carl");
2727 store
2728 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2729 .unwrap();
2730 set_current_auth_identity("carl".to_string(), Role::Write);
2731 rt.execute_query("DROP TABLE baz").unwrap();
2732 clear_current_auth_identity();
2733 }
2734
2735 #[test]
2736 fn truncate_denied_without_iam_policy() {
2737 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2738 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2739 let store = wire_auth_store(&rt);
2740 store
2747 .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2748 let select_only = Policy {
2750 id: "select-only-2".to_string(),
2751 version: 1,
2752 tenant: None,
2753 created_at: 0,
2754 updated_at: 0,
2755 statements: vec![Statement {
2756 sid: None,
2757 effect: Effect::Allow,
2758 actions: vec![ActionPattern::Exact("select".to_string())],
2759 resources: vec![ResourcePattern::Wildcard],
2760 condition: None,
2761 }],
2762 };
2763 store.put_policy_internal(select_only).unwrap();
2764 let dana = UserId::from_parts(None, "dana");
2765 store
2766 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2767 .unwrap();
2768 set_current_auth_identity("dana".to_string(), Role::Write);
2769 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2770 clear_current_auth_identity();
2771 assert!(
2772 format!("{err}").contains("denied by IAM policy"),
2773 "got: {err}"
2774 );
2775 }
2776
2777 #[test]
2778 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2779 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2780 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2781 .unwrap();
2782 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2783 .unwrap();
2784 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2785 .unwrap();
2786
2787 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2788 assert_eq!(truncated.statement_type, "truncate");
2789 assert_eq!(truncated.affected_rows, 0);
2790
2791 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2792 assert!(empty.result.records.is_empty());
2793
2794 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2795 .unwrap();
2796 let selected = rt
2797 .execute_query("SELECT name FROM users WHERE id = 3")
2798 .unwrap();
2799 let name = selected.result.records[0].get("name").unwrap();
2800 assert_eq!(name, &Value::text("cy"));
2801 assert!(rt.db().collection_contract("users").is_some());
2802 assert!(rt
2803 .inner
2804 .index_store
2805 .list_indices("users")
2806 .iter()
2807 .any(|index| index.name == "idx_users_id"));
2808 }
2809
2810 #[test]
2811 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2812 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2813 rt.execute_query("CREATE QUEUE tasks").unwrap();
2814 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2815
2816 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2817 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2818
2819 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2820 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2821 assert_eq!(
2822 len.result.records[0].get("len"),
2823 Some(&Value::UnsignedInteger(0))
2824 );
2825 }
2826
2827 #[test]
2828 fn truncate_system_schema_is_read_only() {
2829 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2830 let err = rt
2831 .execute_query("TRUNCATE COLLECTION red.collections")
2832 .unwrap_err();
2833 assert!(format!("{err}").contains("system schema is read-only"));
2834 }
2835
2836 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2839 let result = rt
2840 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2841 .expect("peek queue");
2842 result
2843 .result
2844 .records
2845 .iter()
2846 .map(
2847 |record| match record.get("payload").expect("payload column") {
2848 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2849 other => panic!("expected JSON queue payload, got {other:?}"),
2850 },
2851 )
2852 .collect()
2853 }
2854
2855 #[test]
2858 fn truncate_event_enabled_table_emits_single_truncate_event() {
2859 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2860 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2861 .unwrap();
2862 rt.execute_query(
2863 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2864 )
2865 .unwrap();
2866
2867 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2869
2870 rt.execute_query("TRUNCATE TABLE users").unwrap();
2871
2872 let events = queue_payloads(&rt, "users_events");
2873 assert_eq!(
2875 events.len(),
2876 1,
2877 "expected 1 truncate event, got {}",
2878 events.len()
2879 );
2880 let ev = events[0].as_object().expect("event is object");
2881 assert_eq!(
2882 ev.get("op").and_then(crate::json::Value::as_str),
2883 Some("truncate")
2884 );
2885 assert_eq!(
2886 ev.get("collection").and_then(crate::json::Value::as_str),
2887 Some("users")
2888 );
2889 assert_eq!(
2890 ev.get("entities_count")
2891 .and_then(crate::json::Value::as_u64),
2892 Some(3)
2893 );
2894 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2895 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2896 assert!(ev
2897 .get("event_id")
2898 .and_then(crate::json::Value::as_str)
2899 .is_some_and(|s| !s.is_empty()));
2900 }
2901
2902 #[test]
2904 fn truncate_no_events_collection_emits_nothing() {
2905 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2906 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2907 .unwrap();
2908 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2909 .unwrap();
2910 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2912 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2914 assert!(rows.result.records.is_empty());
2915 }
2916
2917 #[test]
2921 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2922 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2923 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2924 .unwrap();
2925 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2926 .unwrap();
2927
2928 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2930
2931 rt.execute_query("DROP TABLE users").unwrap();
2932
2933 let events = queue_payloads(&rt, "users_events");
2935 assert_eq!(
2936 events.len(),
2937 1,
2938 "expected 1 collection_dropped event, got {}",
2939 events.len()
2940 );
2941 let ev = events[0].as_object().expect("event is object");
2942 assert_eq!(
2943 ev.get("op").and_then(crate::json::Value::as_str),
2944 Some("collection_dropped")
2945 );
2946 assert_eq!(
2947 ev.get("collection").and_then(crate::json::Value::as_str),
2948 Some("users")
2949 );
2950 assert_eq!(
2951 ev.get("final_entities_count")
2952 .and_then(crate::json::Value::as_u64),
2953 Some(2)
2954 );
2955 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2956 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2957 assert!(ev
2958 .get("event_id")
2959 .and_then(crate::json::Value::as_str)
2960 .is_some_and(|s| !s.is_empty()));
2961
2962 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2964 assert!(
2965 format!("{err}").contains("users"),
2966 "expected not-found error"
2967 );
2968 }
2969
2970 #[test]
2973 fn drop_no_events_collection_emits_nothing() {
2974 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2975 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2976 .unwrap();
2977 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2978 .unwrap();
2979 rt.execute_query("DROP TABLE plain").unwrap();
2980 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2982 assert!(format!("{err}").contains("plain"));
2983 }
2984
2985 #[test]
2989 fn ops_filter_insert_only_ignores_update_and_delete() {
2990 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2991 rt.execute_query(
2992 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2993 )
2994 .unwrap();
2995 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2996 .unwrap();
2997 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2998 .unwrap();
2999 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3000
3001 let events = queue_payloads(&rt, "items_events");
3002 assert_eq!(
3004 events.len(),
3005 1,
3006 "expected 1 insert event, got {}",
3007 events.len()
3008 );
3009 assert_eq!(
3010 events[0]
3011 .as_object()
3012 .unwrap()
3013 .get("op")
3014 .and_then(crate::json::Value::as_str),
3015 Some("insert")
3016 );
3017 }
3018
3019 #[test]
3021 fn where_filter_skips_rows_that_do_not_match() {
3022 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3023 rt.execute_query(
3024 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3025 )
3026 .unwrap();
3027
3028 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3030 .unwrap();
3031 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3033 .unwrap();
3034
3035 let events = queue_payloads(&rt, "users_events");
3036 assert_eq!(
3037 events.len(),
3038 1,
3039 "expected 1 event (only active), got {}",
3040 events.len()
3041 );
3042 let ev = events[0].as_object().unwrap();
3043 assert_eq!(
3044 ev.get("op").and_then(crate::json::Value::as_str),
3045 Some("insert")
3046 );
3047 let after = ev.get("after").unwrap().as_object().unwrap();
3048 assert_eq!(
3049 after.get("status").and_then(crate::json::Value::as_str),
3050 Some("active")
3051 );
3052 }
3053
3054 #[test]
3056 fn ops_filter_and_where_filter_combined() {
3057 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3058 rt.execute_query(
3059 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3060 )
3061 .unwrap();
3062
3063 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3065 .unwrap();
3066 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3068 .unwrap();
3069 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3071 .unwrap();
3072 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3074
3075 let events = queue_payloads(&rt, "items_events");
3076 assert_eq!(
3078 events.len(),
3079 1,
3080 "expected 1 event, got {}: {events:?}",
3081 events.len()
3082 );
3083 assert_eq!(
3084 events[0]
3085 .as_object()
3086 .unwrap()
3087 .get("op")
3088 .and_then(crate::json::Value::as_str),
3089 Some("insert")
3090 );
3091 }
3092
3093 #[test]
3095 fn where_filter_on_delete_checks_before_state() {
3096 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3097 rt.execute_query(
3098 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3099 )
3100 .unwrap();
3101
3102 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3103 .unwrap();
3104
3105 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3107 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3109
3110 let events = queue_payloads(&rt, "users_events");
3111 assert_eq!(
3112 events.len(),
3113 1,
3114 "expected 1 delete event, got {}",
3115 events.len()
3116 );
3117 let ev = events[0].as_object().unwrap();
3118 assert_eq!(
3119 ev.get("op").and_then(crate::json::Value::as_str),
3120 Some("delete")
3121 );
3122 }
3123
3124 #[test]
3128 fn alter_add_column_on_event_enabled_table_succeeds() {
3129 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3130 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3131 .unwrap();
3132 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3134 .unwrap();
3135 let contract = rt.db().collection_contract("users").unwrap();
3137 assert!(
3138 contract.declared_columns.iter().any(|c| c.name == "phone"),
3139 "phone column should be in contract"
3140 );
3141 assert!(
3143 contract.subscriptions.iter().any(|s| s.enabled),
3144 "subscription should remain enabled"
3145 );
3146 }
3147
3148 #[test]
3151 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3152 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3153 rt.execute_query(
3154 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3155 )
3156 .unwrap();
3157 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3159 .unwrap();
3160 let contract = rt.db().collection_contract("items").unwrap();
3161 assert!(
3162 !contract.declared_columns.iter().any(|c| c.name == "secret"),
3163 "secret column should be removed"
3164 );
3165 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3167 .unwrap();
3168 assert!(
3170 contract.subscriptions.iter().any(|s| s.enabled),
3171 "subscription should remain enabled"
3172 );
3173 }
3174
3175 #[test]
3181 fn create_vector_marks_collection_as_turbo_baseline() {
3182 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3183 rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3184 let store = rt.db().store();
3185 assert!(
3186 crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3187 "new vector collections must be turbo-marked baseline"
3188 );
3189 assert!(
3190 rt.db().turbo_state("embeddings").is_some(),
3191 "turbo_state must materialise after CREATE VECTOR"
3192 );
3193 }
3194
3195 #[test]
3199 fn create_table_does_not_mark_turbo() {
3200 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3201 rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3202 let store = rt.db().store();
3203 assert!(
3204 !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3205 "non-vector collections must not gain the turbo marker"
3206 );
3207 assert!(rt.db().turbo_state("plain").is_none());
3208 }
3209
3210 #[test]
3213 fn create_collection_kind_vector_turbo_still_marked() {
3214 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3215 rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3216 .unwrap();
3217 let store = rt.db().store();
3218 assert!(crate::runtime::vector_turbo_kind::is_turbo(
3219 &store, "turbo_v"
3220 ));
3221 assert!(rt.db().turbo_state("turbo_v").is_some());
3222 }
3223}