1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 crate::reserved_fields::ensure_no_reserved_public_item_fields(
37 query.columns.iter().map(|column| column.name.as_str()),
38 &format!("table '{}'", query.name),
39 )?;
40 let exists = store.get_collection(&query.name).is_some();
42 if exists {
43 if query.if_not_exists {
44 return Ok(RuntimeQueryResult::ok_message(
45 raw_query.to_string(),
46 &format!("table '{}' already exists", query.name),
47 "create",
48 ));
49 }
50 return Err(RedDBError::Query(format!(
51 "table '{}' already exists",
52 query.name
53 )));
54 }
55
56 let contract = collection_contract_from_create_table(query)?;
59 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60 store
62 .create_collection(&query.name)
63 .map_err(|err| RedDBError::Internal(err.to_string()))?;
64 for subscription in &contract.subscriptions {
65 ensure_event_target_queue(self, &subscription.target_queue)?;
66 }
67 if let Some(default_ttl_ms) = query.default_ttl_ms {
68 self.inner
69 .db
70 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71 }
72 self.inner
73 .db
74 .save_collection_contract(contract)
75 .map_err(|err| RedDBError::Internal(err.to_string()))?;
76 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77 store.set_config_tree(
78 &format!("red.collection_tenants.{}", query.name),
79 &crate::serde_json::Value::String(tenant_id),
80 );
81 }
82 self.inner
83 .db
84 .persist_metadata()
85 .map_err(|err| RedDBError::Internal(err.to_string()))?;
86 self.refresh_table_planner_stats(&query.name);
87 self.invalidate_result_cache();
88 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91 self.schema_vocabulary_apply(
92 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93 collection: query.name.clone(),
94 columns,
95 type_tags: Vec::new(),
96 description: None,
97 },
98 );
99 if let Some(spec) = &query.partition_by {
106 let kind_str = match spec.kind {
107 crate::storage::query::ast::PartitionKind::Range => "range",
108 crate::storage::query::ast::PartitionKind::List => "list",
109 crate::storage::query::ast::PartitionKind::Hash => "hash",
110 };
111 store.set_config_tree(
112 &format!("partition.{}.by", query.name),
113 &crate::serde_json::Value::String(kind_str.to_string()),
114 );
115 store.set_config_tree(
116 &format!("partition.{}.column", query.name),
117 &crate::serde_json::Value::String(spec.column.clone()),
118 );
119 }
120
121 if let Some(col) = &query.tenant_by {
132 store.set_config_tree(
133 &format!("tenant_tables.{}.column", query.name),
134 &crate::serde_json::Value::String(col.clone()),
135 );
136 self.register_tenant_table(&query.name, col);
137 }
138
139 let ttl_suffix = query
140 .default_ttl_ms
141 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142 .unwrap_or_default();
143
144 let tenant_suffix = query
145 .tenant_by
146 .as_ref()
147 .map(|col| format!(" (tenant-scoped by {col})"))
148 .unwrap_or_default();
149
150 Ok(RuntimeQueryResult::ok_message(
151 raw_query.to_string(),
152 &format!(
153 "table '{}' created{}{}",
154 query.name, ttl_suffix, tenant_suffix
155 ),
156 "create",
157 ))
158 }
159
160 fn execute_create_keyed_collection(
161 &self,
162 raw_query: &str,
163 query: &CreateTableQuery,
164 ) -> RedDBResult<RuntimeQueryResult> {
165 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166 if is_system_schema_name(&query.name) {
167 return Err(RedDBError::Query("system schema is read-only".to_string()));
168 }
169 let store = self.inner.db.store();
170 let label = polymorphic_resolver::model_name(query.collection_model);
171 if store.get_collection(&query.name).is_some() {
172 if query.if_not_exists {
173 return Ok(RuntimeQueryResult::ok_message(
174 raw_query.to_string(),
175 &format!("{label} '{}' already exists", query.name),
176 "create",
177 ));
178 }
179 return Err(RedDBError::Query(format!(
180 "{label} '{}' already exists",
181 query.name
182 )));
183 }
184
185 store
186 .create_collection(&query.name)
187 .map_err(|err| RedDBError::Internal(err.to_string()))?;
188 if query.collection_model == CollectionModel::Vault {
189 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190 let key_scope = if query.vault_own_master_key {
191 "own"
192 } else {
193 "cluster"
194 };
195 store.set_config_tree(
196 &format!("red.vault.{}.key_scope", query.name),
197 &crate::serde_json::Value::String(key_scope.to_string()),
198 );
199 store.set_config_tree(
200 &format!("red.vault.{}.status", query.name),
201 &crate::serde_json::Value::String("sealed".to_string()),
202 );
203 }
204 if query.collection_model == CollectionModel::Metrics {
205 for spec in &query.metrics_rollup_policies {
206 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207 .ok_or_else(|| {
208 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209 })?;
210 if policy.source != "raw" {
211 return Err(RedDBError::Query(format!(
212 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213 spec
214 )));
215 }
216 if !matches!(
217 policy.aggregation.as_str(),
218 "avg" | "sum" | "min" | "max" | "count"
219 ) {
220 return Err(RedDBError::Query(format!(
221 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222 spec
223 )));
224 }
225 }
226 if let Some(raw_retention_ms) = query.default_ttl_ms {
227 self.inner
228 .db
229 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230 store.set_config_tree(
231 &format!("red.metrics.{}.raw_retention_ms", query.name),
232 &crate::serde_json::Value::Number(raw_retention_ms as f64),
233 );
234 }
235 let tenant_identity = query
236 .tenant_by
237 .clone()
238 .unwrap_or_else(|| "current_tenant".to_string());
239 store.set_config_tree(
240 &format!("red.metrics.{}.tenant_identity", query.name),
241 &crate::serde_json::Value::String(tenant_identity),
242 );
243 store.set_config_tree(
244 &format!("red.metrics.{}.namespace", query.name),
245 &crate::serde_json::Value::String("default".to_string()),
246 );
247 if !query.metrics_rollup_policies.is_empty() {
248 store.set_config_tree(
249 &format!("red.metrics.{}.rollup_policies", query.name),
250 &crate::serde_json::Value::Array(
251 query
252 .metrics_rollup_policies
253 .iter()
254 .cloned()
255 .map(crate::serde_json::Value::String)
256 .collect(),
257 ),
258 );
259 }
260 }
261 let contract = if query.collection_model == CollectionModel::Metrics {
262 metrics_collection_contract(query)
263 } else {
264 keyed_collection_contract(
265 &query.name,
266 query.collection_model,
267 query.analytics_config.clone(),
268 )
269 };
270 self.inner
271 .db
272 .save_collection_contract(contract)
273 .map_err(|err| RedDBError::Internal(err.to_string()))?;
274 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275 store.set_config_tree(
276 &format!("red.collection_tenants.{}", query.name),
277 &crate::serde_json::Value::String(tenant_id),
278 );
279 }
280 self.inner
281 .db
282 .persist_metadata()
283 .map_err(|err| RedDBError::Internal(err.to_string()))?;
284 self.invalidate_result_cache();
285
286 Ok(RuntimeQueryResult::ok_message(
287 raw_query.to_string(),
288 &format!("{label} '{}' created", query.name),
289 "create",
290 ))
291 }
292
293 pub fn ensure_system_graph_with_analytics(
302 &self,
303 name: &str,
304 outputs: &[crate::catalog::AnalyticsOutput],
305 ) -> RedDBResult<()> {
306 let store = self.inner.db.store();
307 if store.get_collection(name).is_some() {
308 return Ok(());
309 }
310 let analytics_config = outputs
311 .iter()
312 .map(|output| crate::catalog::AnalyticsViewDescriptor {
313 output: *output,
314 algorithm: None,
315 resolution: None,
316 max_iterations: None,
317 tolerance: None,
318 })
319 .collect();
320 store
321 .create_collection(name)
322 .map_err(|err| RedDBError::Internal(err.to_string()))?;
323 let contract = keyed_collection_contract(
324 name,
325 crate::catalog::CollectionModel::Graph,
326 analytics_config,
327 );
328 self.inner
329 .db
330 .save_collection_contract(contract)
331 .map_err(|err| RedDBError::Internal(err.to_string()))?;
332 self.inner
333 .db
334 .persist_metadata()
335 .map_err(|err| RedDBError::Internal(err.to_string()))?;
336 self.invalidate_result_cache_process_only();
337 Ok(())
338 }
339
340 pub fn execute_create_collection(
341 &self,
342 raw_query: &str,
343 query: &CreateCollectionQuery,
344 ) -> RedDBResult<RuntimeQueryResult> {
345 let model = match query.kind.as_str() {
346 "graph" => CollectionModel::Graph,
347 "document" => CollectionModel::Document,
348 "metrics" => CollectionModel::Metrics,
349 "vector.turbo" => {
350 let dimension = query.vector_dimension.ok_or_else(|| {
351 RedDBError::Query(
352 "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353 )
354 })?;
355 let create = CreateVectorQuery {
356 name: query.name.clone(),
357 dimension,
358 metric: query
359 .vector_metric
360 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361 if_not_exists: query.if_not_exists,
362 };
363 let result = self.execute_create_vector(raw_query, &create)?;
364 let store = self.inner.db.store();
371 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372 self.inner
373 .db
374 .persist_metadata()
375 .map_err(|err| RedDBError::Internal(err.to_string()))?;
376 let _ = self.inner.db.turbo_state(&query.name);
381 return Ok(result);
382 }
383 "blockchain" => CollectionModel::Table,
388 other => {
389 return Err(RedDBError::Query(format!(
390 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391 )));
392 }
393 };
394 let create = CreateTableQuery {
395 collection_model: model,
396 name: query.name.clone(),
397 columns: Vec::new(),
398 if_not_exists: query.if_not_exists,
399 default_ttl_ms: None,
400 metrics_rollup_policies: Vec::new(),
401 context_index_fields: Vec::new(),
402 context_index_enabled: false,
403 timestamps: false,
404 partition_by: None,
405 tenant_by: None,
406 append_only: false,
407 subscriptions: Vec::new(),
408 analytics_config: Vec::new(),
409 vault_own_master_key: false,
410 };
411 let result = self.execute_create_table(raw_query, &create)?;
412 if query.kind == "blockchain" {
413 self.install_blockchain_kind(&query.name)?;
414 }
415 if !query.allowed_signers.is_empty() {
420 let actor = crate::runtime::impl_core::current_user_projected()
421 .unwrap_or_else(|| "@system/create-collection".to_string());
422 crate::runtime::signed_writes_kind::install(
423 &self.inner.db.store(),
424 &query.name,
425 &query.allowed_signers,
426 &actor,
427 );
428 }
429 Ok(result)
430 }
431
432 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436 use crate::runtime::blockchain_kind;
437 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438 use std::sync::Arc;
439
440 let store = self.inner.db.store();
441 blockchain_kind::mark_as_chain(&store, name);
442
443 let existing_tip = blockchain_kind::chain_tip(&store, name);
444 if existing_tip.height.is_some() {
445 return Ok(());
446 }
447
448 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450 fields.into_iter().collect();
451 let entity = UnifiedEntity::new(
452 EntityId::new(0),
453 EntityKind::TableRow {
454 table: Arc::from(name),
455 row_id: 0,
456 },
457 EntityData::Row(RowData {
458 columns: Vec::new(),
459 named: Some(named),
460 schema: None,
461 }),
462 );
463 store
464 .insert_auto(name, entity)
465 .map_err(|err| RedDBError::Internal(err.to_string()))?;
466 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469 self.inner
470 .chain_tip_cache
471 .lock()
472 .insert(name.to_string(), tip);
473 }
474 Ok(())
475 }
476
477 pub fn execute_create_vector(
478 &self,
479 raw_query: &str,
480 query: &CreateVectorQuery,
481 ) -> RedDBResult<RuntimeQueryResult> {
482 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483 if is_system_schema_name(&query.name) {
484 return Err(RedDBError::Query("system schema is read-only".to_string()));
485 }
486 let store = self.inner.db.store();
487 if store.get_collection(&query.name).is_some() {
488 if query.if_not_exists {
489 return Ok(RuntimeQueryResult::ok_message(
490 raw_query.to_string(),
491 &format!("vector '{}' already exists", query.name),
492 "create",
493 ));
494 }
495 return Err(RedDBError::Query(format!(
496 "vector '{}' already exists",
497 query.name
498 )));
499 }
500
501 store
502 .create_collection(&query.name)
503 .map_err(|err| RedDBError::Internal(err.to_string()))?;
504 self.inner
505 .db
506 .save_collection_contract(vector_collection_contract(query))
507 .map_err(|err| RedDBError::Internal(err.to_string()))?;
508 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509 store.set_config_tree(
510 &format!("red.collection_tenants.{}", query.name),
511 &crate::serde_json::Value::String(tenant_id),
512 );
513 }
514 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523 self.inner
524 .db
525 .persist_metadata()
526 .map_err(|err| RedDBError::Internal(err.to_string()))?;
527 let _ = self.inner.db.turbo_state(&query.name);
532 self.invalidate_result_cache();
533
534 Ok(RuntimeQueryResult::ok_message(
535 raw_query.to_string(),
536 &format!("vector '{}' created", query.name),
537 "create",
538 ))
539 }
540
541 fn provision_vault_key_material(
542 &self,
543 collection: &str,
544 own_master_key: bool,
545 ) -> RedDBResult<()> {
546 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548 })?;
549 if !auth_store.is_vault_backed() {
550 return Err(RedDBError::Query(
551 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552 ));
553 }
554
555 if auth_store.vault_secret_key().is_none() {
556 let key = crate::auth::store::random_bytes(32);
557 auth_store
558 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559 .map_err(|err| RedDBError::Query(err.to_string()))?;
560 }
561
562 if own_master_key {
563 let key = crate::auth::store::random_bytes(32);
564 auth_store
565 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566 .map_err(|err| RedDBError::Query(err.to_string()))?;
567 }
568
569 Ok(())
570 }
571
572 pub fn execute_drop_table(
576 &self,
577 raw_query: &str,
578 query: &DropTableQuery,
579 ) -> RedDBResult<RuntimeQueryResult> {
580 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581 let store = self.inner.db.store();
582
583 if is_system_schema_name(&query.name) {
584 return Err(RedDBError::Query("system schema is read-only".to_string()));
585 }
586
587 let exists = store.get_collection(&query.name).is_some();
588 if !exists {
589 if query.if_exists {
590 return Ok(RuntimeQueryResult::ok_message(
591 raw_query.to_string(),
592 &format!("table '{}' does not exist", query.name),
593 "drop",
594 ));
595 }
596 return Err(RedDBError::NotFound(format!(
597 "table '{}' not found",
598 query.name
599 )));
600 }
601 let actual =
602 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605 let final_count = store
608 .get_collection(&query.name)
609 .map(|manager| manager.query_all(|_| true).len() as u64)
610 .unwrap_or(0);
611 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612 self,
613 &query.name,
614 final_count,
615 )?;
616
617 let orphaned_indices: Vec<String> = self
618 .inner
619 .index_store
620 .list_indices(&query.name)
621 .into_iter()
622 .map(|index| index.name)
623 .collect();
624 for name in &orphaned_indices {
625 self.inner.index_store.drop_index(name, &query.name);
626 }
627
628 store
629 .drop_collection(&query.name)
630 .map_err(|err| RedDBError::Internal(err.to_string()))?;
631 self.inner.db.invalidate_vector_index(&query.name);
632 self.inner.db.clear_collection_default_ttl_ms(&query.name);
633 self.inner
634 .db
635 .remove_collection_contract(&query.name)
636 .map_err(|err| RedDBError::Internal(err.to_string()))?;
637 self.clear_table_planner_stats(&query.name);
638 self.invalidate_result_cache();
639 if let Some(store) = self.inner.auth_store.read().clone() {
643 store.invalidate_visible_collections_cache();
644 }
645 self.inner
646 .db
647 .persist_metadata()
648 .map_err(|err| RedDBError::Internal(err.to_string()))?;
649 self.schema_vocabulary_apply(
655 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656 collection: query.name.clone(),
657 },
658 );
659
660 Ok(RuntimeQueryResult::ok_message(
661 raw_query.to_string(),
662 &format!("table '{}' dropped", query.name),
663 "drop",
664 ))
665 }
666
667 pub fn execute_drop_graph(
668 &self,
669 raw_query: &str,
670 query: &DropGraphQuery,
671 ) -> RedDBResult<RuntimeQueryResult> {
672 self.execute_drop_typed_collection(
673 raw_query,
674 &query.name,
675 query.if_exists,
676 CollectionModel::Graph,
677 "graph",
678 )
679 }
680
681 pub fn execute_drop_vector(
682 &self,
683 raw_query: &str,
684 query: &DropVectorQuery,
685 ) -> RedDBResult<RuntimeQueryResult> {
686 self.execute_drop_typed_collection(
687 raw_query,
688 &query.name,
689 query.if_exists,
690 CollectionModel::Vector,
691 "vector",
692 )
693 }
694
695 pub fn execute_drop_document(
696 &self,
697 raw_query: &str,
698 query: &DropDocumentQuery,
699 ) -> RedDBResult<RuntimeQueryResult> {
700 self.execute_drop_typed_collection(
701 raw_query,
702 &query.name,
703 query.if_exists,
704 CollectionModel::Document,
705 "document",
706 )
707 }
708
709 pub fn execute_drop_kv(
710 &self,
711 raw_query: &str,
712 query: &DropKvQuery,
713 ) -> RedDBResult<RuntimeQueryResult> {
714 let label = polymorphic_resolver::model_name(query.model);
715 self.execute_drop_typed_collection(
716 raw_query,
717 &query.name,
718 query.if_exists,
719 query.model,
720 label,
721 )
722 }
723
724 pub fn execute_drop_collection(
725 &self,
726 raw_query: &str,
727 query: &DropCollectionQuery,
728 ) -> RedDBResult<RuntimeQueryResult> {
729 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730 if is_system_schema_name(&query.name) {
731 return Err(RedDBError::Query("system schema is read-only".to_string()));
732 }
733 let store = self.inner.db.store();
734 if store.get_collection(&query.name).is_none() {
735 if query.if_exists {
736 return Ok(RuntimeQueryResult::ok_message(
737 raw_query.to_string(),
738 &format!("collection '{}' does not exist", query.name),
739 "drop",
740 ));
741 }
742 return Err(RedDBError::NotFound(format!(
743 "collection '{}' not found",
744 query.name
745 )));
746 }
747
748 let actual =
749 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750 if let Some(expected) = query.model {
751 polymorphic_resolver::ensure_model_match(expected, actual)?;
752 }
753
754 match actual {
755 CollectionModel::Table => self.execute_drop_table(
756 raw_query,
757 &DropTableQuery {
758 name: query.name.clone(),
759 if_exists: query.if_exists,
760 },
761 ),
762 CollectionModel::TimeSeries => self.execute_drop_timeseries(
763 raw_query,
764 &DropTimeSeriesQuery {
765 name: query.name.clone(),
766 if_exists: query.if_exists,
767 },
768 ),
769 CollectionModel::Queue => self.execute_drop_queue(
770 raw_query,
771 &DropQueueQuery {
772 name: query.name.clone(),
773 if_exists: query.if_exists,
774 },
775 ),
776 CollectionModel::Graph => self.execute_drop_graph(
777 raw_query,
778 &DropGraphQuery {
779 name: query.name.clone(),
780 if_exists: query.if_exists,
781 },
782 ),
783 CollectionModel::Vector => self.execute_drop_vector(
784 raw_query,
785 &DropVectorQuery {
786 name: query.name.clone(),
787 if_exists: query.if_exists,
788 },
789 ),
790 CollectionModel::Document => self.execute_drop_document(
791 raw_query,
792 &DropDocumentQuery {
793 name: query.name.clone(),
794 if_exists: query.if_exists,
795 },
796 ),
797 CollectionModel::Kv => self.execute_drop_kv(
798 raw_query,
799 &DropKvQuery {
800 name: query.name.clone(),
801 if_exists: query.if_exists,
802 model: CollectionModel::Kv,
803 },
804 ),
805 CollectionModel::Config => self.execute_drop_kv(
806 raw_query,
807 &DropKvQuery {
808 name: query.name.clone(),
809 if_exists: query.if_exists,
810 model: CollectionModel::Config,
811 },
812 ),
813 CollectionModel::Vault => self.execute_drop_kv(
814 raw_query,
815 &DropKvQuery {
816 name: query.name.clone(),
817 if_exists: query.if_exists,
818 model: CollectionModel::Vault,
819 },
820 ),
821 CollectionModel::Hll => self.execute_probabilistic_command(
822 raw_query,
823 &ProbabilisticCommand::DropHll {
824 name: query.name.clone(),
825 if_exists: query.if_exists,
826 },
827 ),
828 CollectionModel::Sketch => self.execute_probabilistic_command(
829 raw_query,
830 &ProbabilisticCommand::DropSketch {
831 name: query.name.clone(),
832 if_exists: query.if_exists,
833 },
834 ),
835 CollectionModel::Filter => self.execute_probabilistic_command(
836 raw_query,
837 &ProbabilisticCommand::DropFilter {
838 name: query.name.clone(),
839 if_exists: query.if_exists,
840 },
841 ),
842 CollectionModel::Metrics => self.execute_drop_typed_collection(
843 raw_query,
844 &query.name,
845 query.if_exists,
846 CollectionModel::Metrics,
847 "metrics",
848 ),
849 CollectionModel::Mixed => self.execute_drop_typed_collection(
850 raw_query,
851 &query.name,
852 query.if_exists,
853 CollectionModel::Mixed,
854 "collection",
855 ),
856 }
857 }
858
859 fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865 let is_graph = self
866 .inner
867 .db
868 .collection_contract(name)
869 .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870 .unwrap_or(false);
871 if !is_graph {
872 return Err(RedDBError::Query(format!(
873 "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874 )));
875 }
876 Ok(())
877 }
878
879 pub fn execute_alter_table(
885 &self,
886 raw_query: &str,
887 query: &AlterTableQuery,
888 ) -> RedDBResult<RuntimeQueryResult> {
889 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890 let store = self.inner.db.store();
891
892 if store.get_collection(&query.name).is_none() {
894 return Err(RedDBError::NotFound(format!(
895 "table '{}' not found",
896 query.name
897 )));
898 }
899
900 let mut messages = Vec::new();
901
902 let fields_added: Vec<String> = query
904 .operations
905 .iter()
906 .filter_map(|op| {
907 if let AlterOperation::AddColumn(col) = op {
908 Some(col.name.clone())
909 } else {
910 None
911 }
912 })
913 .collect();
914 let fields_removed: Vec<String> = query
915 .operations
916 .iter()
917 .filter_map(|op| {
918 if let AlterOperation::DropColumn(name) = op {
919 Some(name.clone())
920 } else {
921 None
922 }
923 })
924 .collect();
925
926 for op in &query.operations {
927 match op {
928 AlterOperation::AddColumn(col) => {
929 messages.push(format!("column '{}' added", col.name));
931 }
932 AlterOperation::DropColumn(name) => {
933 messages.push(format!("column '{}' dropped", name));
934 }
935 AlterOperation::RenameColumn { from, to } => {
936 messages.push(format!("column '{}' renamed to '{}'", from, to));
937 }
938 AlterOperation::AttachPartition { child, bound } => {
939 store.set_config_tree(
943 &format!("partition.{}.children.{}", query.name, child),
944 &crate::serde_json::Value::String(bound.clone()),
945 );
946 messages.push(format!(
947 "partition '{child}' attached to '{}' ({bound})",
948 query.name
949 ));
950 }
951 AlterOperation::DetachPartition { child } => {
952 store.set_config_tree(
953 &format!("partition.{}.children.{}", query.name, child),
954 &crate::serde_json::Value::Null,
955 );
956 messages.push(format!(
957 "partition '{child}' detached from '{}'",
958 query.name
959 ));
960 }
961 AlterOperation::EnableRowLevelSecurity => {
962 self.inner
963 .rls_enabled_tables
964 .write()
965 .insert(query.name.clone());
966 store.set_config_tree(
968 &format!("rls.enabled.{}", query.name),
969 &crate::serde_json::Value::Bool(true),
970 );
971 self.invalidate_plan_cache();
972 messages.push(format!("row level security enabled on '{}'", query.name));
973 }
974 AlterOperation::DisableRowLevelSecurity => {
975 self.inner.rls_enabled_tables.write().remove(&query.name);
976 store.set_config_tree(
977 &format!("rls.enabled.{}", query.name),
978 &crate::serde_json::Value::Null,
979 );
980 self.invalidate_plan_cache();
981 messages.push(format!("row level security disabled on '{}'", query.name));
982 }
983 AlterOperation::EnableTenancy { column } => {
985 store.set_config_tree(
986 &format!("tenant_tables.{}.column", query.name),
987 &crate::serde_json::Value::String(column.clone()),
988 );
989 self.register_tenant_table(&query.name, column);
990 self.invalidate_plan_cache();
991 messages.push(format!(
992 "tenancy enabled on '{}' by column '{column}'",
993 query.name
994 ));
995 }
996 AlterOperation::DisableTenancy => {
997 store.set_config_tree(
998 &format!("tenant_tables.{}.column", query.name),
999 &crate::serde_json::Value::Null,
1000 );
1001 self.unregister_tenant_table(&query.name);
1002 self.invalidate_plan_cache();
1003 messages.push(format!("tenancy disabled on '{}'", query.name));
1004 }
1005 AlterOperation::SetAppendOnly(on) => {
1006 messages.push(format!(
1011 "append_only {} on '{}'",
1012 if *on { "enabled" } else { "disabled" },
1013 query.name
1014 ));
1015 }
1016 AlterOperation::SetVersioned(on) => {
1017 self.vcs_set_versioned(&query.name, *on)?;
1024 messages.push(format!(
1025 "versioned {} on '{}'",
1026 if *on { "enabled" } else { "disabled" },
1027 query.name
1028 ));
1029 }
1030 AlterOperation::EnableEvents(subscription) => {
1031 let mut subscription = subscription.clone();
1032 subscription.source = query.name.clone();
1033 validate_event_subscriptions(
1034 self,
1035 &query.name,
1036 std::slice::from_ref(&subscription),
1037 )?;
1038 ensure_event_target_queue(self, &subscription.target_queue)?;
1039 messages.push(format!(
1040 "events enabled on '{}' to '{}'",
1041 query.name, subscription.target_queue
1042 ));
1043 }
1044 AlterOperation::DisableEvents => {
1045 messages.push(format!("events disabled on '{}'", query.name));
1046 }
1047 AlterOperation::AddSubscription { name, descriptor } => {
1048 let mut sub = descriptor.clone();
1049 sub.name = name.clone();
1050 sub.source = query.name.clone();
1051 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052 ensure_event_target_queue(self, &sub.target_queue)?;
1053 messages.push(format!(
1054 "subscription '{}' added on '{}' to '{}'",
1055 name, query.name, sub.target_queue
1056 ));
1057 }
1058 AlterOperation::DropSubscription { name } => {
1059 messages.push(format!(
1060 "subscription '{}' dropped on '{}'",
1061 name, query.name
1062 ));
1063 }
1064 AlterOperation::AddSigner { pubkey } => {
1065 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072 return Err(RedDBError::Query(format!(
1073 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075 query.name
1076 )));
1077 }
1078 let actor = crate::runtime::impl_core::current_user_projected()
1079 .unwrap_or_else(|| "@system/alter".to_string());
1080 let changed = crate::runtime::signed_writes_kind::add_signer(
1081 &store,
1082 &query.name,
1083 *pubkey,
1084 &actor,
1085 );
1086 messages.push(format!(
1087 "signer {} on '{}'",
1088 if changed { "added" } else { "already present" },
1089 query.name
1090 ));
1091 }
1092 AlterOperation::RevokeSigner { pubkey } => {
1093 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094 return Err(RedDBError::Query(format!(
1095 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096 query.name
1097 )));
1098 }
1099 let actor = crate::runtime::impl_core::current_user_projected()
1100 .unwrap_or_else(|| "@system/alter".to_string());
1101 let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102 &store,
1103 &query.name,
1104 pubkey,
1105 &actor,
1106 );
1107 messages.push(format!(
1108 "signer {} on '{}'",
1109 if changed {
1110 "revoked"
1111 } else {
1112 "already revoked"
1113 },
1114 query.name
1115 ));
1116 }
1117 AlterOperation::SetRetention { duration_ms } => {
1118 let existing = self.inner.db.collection_contract(&query.name);
1124 let has_ts_column = existing
1125 .as_ref()
1126 .map(retention_timestamp_column_exists)
1127 .unwrap_or(false);
1128 if !has_ts_column {
1129 return Err(RedDBError::Query(format!(
1130 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132 or enable WITH timestamps = true before setting a \
1133 retention policy",
1134 query.name
1135 )));
1136 }
1137 messages.push(format!(
1138 "retention set to {duration_ms} ms on '{}'",
1139 query.name
1140 ));
1141 }
1142 AlterOperation::UnsetRetention => {
1143 messages.push(format!("retention cleared on '{}'", query.name));
1144 }
1145 AlterOperation::AddAnalytics(views) => {
1146 self.require_graph_for_analytics(&query.name)?;
1151 let existing = self.inner.db.collection_contract(&query.name);
1152 let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153 existing
1154 .as_ref()
1155 .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156 .unwrap_or_default();
1157 for view in views {
1158 if enabled.contains(&view.output) {
1159 messages.push(format!(
1162 "analytics '{}' already enabled on '{}'",
1163 view.output.as_str(),
1164 query.name
1165 ));
1166 } else {
1167 messages.push(format!(
1168 "analytics '{}' enabled on '{}'",
1169 view.output.as_str(),
1170 query.name
1171 ));
1172 }
1173 }
1174 }
1175 AlterOperation::DropAnalytics(output) => {
1176 self.require_graph_for_analytics(&query.name)?;
1177 let enabled = self
1178 .inner
1179 .db
1180 .collection_contract(&query.name)
1181 .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182 .unwrap_or(false);
1183 if !enabled {
1184 return Err(RedDBError::Query(format!(
1187 "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188 output.as_str(),
1189 query.name
1190 )));
1191 }
1192 messages.push(format!(
1193 "analytics '{}' disabled on '{}'",
1194 output.as_str(),
1195 query.name
1196 ));
1197 }
1198 }
1199 }
1200
1201 let mut contract = self
1202 .inner
1203 .db
1204 .collection_contract(&query.name)
1205 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206 apply_alter_operations_to_contract(&mut contract, &query.operations);
1207 contract.version = contract.version.saturating_add(1);
1208 contract.updated_at_unix_ms = current_unix_ms();
1209 self.inner
1210 .db
1211 .save_collection_contract(contract)
1212 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213 if !fields_added.is_empty() || !fields_removed.is_empty() {
1217 let sub_names: Vec<String> = self
1218 .inner
1219 .db
1220 .collection_contract(&query.name)
1221 .map(|c| {
1222 c.subscriptions
1223 .iter()
1224 .filter(|s| s.enabled)
1225 .map(|s| s.name.clone())
1226 .collect()
1227 })
1228 .unwrap_or_default();
1229 if !sub_names.is_empty() {
1230 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231 collection: query.name.clone(),
1232 subscription_names: sub_names.join(", "),
1233 fields_added: fields_added.join(", "),
1234 fields_removed: fields_removed.join(", "),
1235 lsn: self.cdc_current_lsn(),
1236 }
1237 .emit_global();
1238 }
1239 }
1240
1241 self.clear_table_planner_stats(&query.name);
1242 self.invalidate_result_cache();
1243 let post_alter_columns: Vec<String> = self
1248 .inner
1249 .db
1250 .collection_contract(&query.name)
1251 .map(|contract| {
1252 contract
1253 .declared_columns
1254 .iter()
1255 .map(|col| col.name.clone())
1256 .collect()
1257 })
1258 .unwrap_or_default();
1259 self.schema_vocabulary_apply(
1260 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261 collection: query.name.clone(),
1262 columns: post_alter_columns,
1263 type_tags: Vec::new(),
1264 description: None,
1265 },
1266 );
1267
1268 let message = if messages.is_empty() {
1269 format!("table '{}' altered (no operations)", query.name)
1270 } else {
1271 format!("table '{}' altered: {}", query.name, messages.join(", "))
1272 };
1273
1274 Ok(RuntimeQueryResult::ok_message(
1275 raw_query.to_string(),
1276 &message,
1277 "alter",
1278 ))
1279 }
1280
1281 pub fn execute_explain_alter(
1288 &self,
1289 raw_query: &str,
1290 query: &ExplainAlterQuery,
1291 ) -> RedDBResult<RuntimeQueryResult> {
1292 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297 let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300 .as_ref()
1301 .map(|c| c.declared_columns.clone())
1302 .unwrap_or_default();
1303
1304 let diff = super::schema_diff::compute_column_diff(
1305 &query.target.name,
1306 ¤t_columns,
1307 &query.target.columns,
1308 );
1309
1310 let rendered = match query.format {
1311 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313 };
1314
1315 let format_label = match query.format {
1316 ExplainFormat::Sql => "sql",
1317 ExplainFormat::Json => "json",
1318 };
1319
1320 let columns = vec![
1321 "table".to_string(),
1322 "format".to_string(),
1323 "diff".to_string(),
1324 ];
1325 let row = vec![
1326 ("table".to_string(), Value::text(query.target.name.clone())),
1327 ("format".to_string(), Value::text(format_label.to_string())),
1328 ("diff".to_string(), Value::text(rendered)),
1329 ];
1330
1331 Ok(RuntimeQueryResult::ok_records(
1332 raw_query.to_string(),
1333 columns,
1334 vec![row],
1335 "explain",
1336 ))
1337 }
1338
1339 pub fn execute_create_index(
1344 &self,
1345 raw_query: &str,
1346 query: &CreateIndexQuery,
1347 ) -> RedDBResult<RuntimeQueryResult> {
1348 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349 let store = self.inner.db.store();
1350
1351 let manager = store
1353 .get_collection(&query.table)
1354 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356 let method_kind = match query.method {
1357 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361 };
1362
1363 let entities = manager.query_all(|_| true);
1373 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374 entities
1375 .iter()
1376 .map(|e| {
1377 let fields = match &e.data {
1378 crate::storage::EntityData::Row(row) => {
1379 if let Some(ref named) = row.named {
1380 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381 } else if let Some(ref schema) = row.schema {
1382 schema
1386 .iter()
1387 .zip(row.columns.iter())
1388 .map(|(k, v)| (k.clone(), v.clone()))
1389 .collect()
1390 } else {
1391 Vec::new()
1392 }
1393 }
1394 crate::storage::EntityData::Node(node) => node
1395 .properties
1396 .iter()
1397 .map(|(k, v)| (k.clone(), v.clone()))
1398 .collect(),
1399 _ => Vec::new(),
1400 };
1401 (e.id, fields)
1402 })
1403 .collect();
1404
1405 let indexed_count = self
1407 .inner
1408 .index_store
1409 .create_index(
1410 &query.name,
1411 &query.table,
1412 &query.columns,
1413 method_kind,
1414 query.unique,
1415 &entity_fields,
1416 )
1417 .map_err(RedDBError::Internal)?;
1418
1419 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420 &query.table,
1421 &entity_fields,
1422 );
1423 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424 self.invalidate_plan_cache();
1425
1426 self.inner
1428 .index_store
1429 .register(super::index_store::RegisteredIndex {
1430 name: query.name.clone(),
1431 collection: query.table.clone(),
1432 columns: query.columns.clone(),
1433 method: method_kind,
1434 unique: query.unique,
1435 });
1436 self.persist_runtime_index_descriptor(super::index_store::RegisteredIndex {
1437 name: query.name.clone(),
1438 collection: query.table.clone(),
1439 columns: query.columns.clone(),
1440 method: method_kind,
1441 unique: query.unique,
1442 })?;
1443 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1447 collection: query.table.clone(),
1448 index: query.name.clone(),
1449 columns: query.columns.clone(),
1450 });
1451
1452 let method_str = format!("{}", query.method);
1453 let unique_str = if query.unique { "unique " } else { "" };
1454 let cols = query.columns.join(", ");
1455
1456 Ok(RuntimeQueryResult::ok_message(
1457 raw_query.to_string(),
1458 &format!(
1459 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1460 unique_str, query.name, query.table, cols, method_str, indexed_count
1461 ),
1462 "create",
1463 ))
1464 }
1465
1466 pub fn execute_drop_index(
1470 &self,
1471 raw_query: &str,
1472 query: &DropIndexQuery,
1473 ) -> RedDBResult<RuntimeQueryResult> {
1474 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1475 let store = self.inner.db.store();
1476
1477 if store.get_collection(&query.table).is_none() {
1479 if query.if_exists {
1480 return Ok(RuntimeQueryResult::ok_message(
1481 raw_query.to_string(),
1482 &format!("table '{}' does not exist", query.table),
1483 "drop",
1484 ));
1485 }
1486 return Err(RedDBError::NotFound(format!(
1487 "table '{}' not found",
1488 query.table
1489 )));
1490 }
1491
1492 self.inner.index_store.drop_index(&query.name, &query.table);
1494 self.persist_runtime_index_drop(&query.table, &query.name)?;
1495 self.invalidate_plan_cache();
1496 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1498 collection: query.table.clone(),
1499 index: query.name.clone(),
1500 });
1501
1502 Ok(RuntimeQueryResult::ok_message(
1503 raw_query.to_string(),
1504 &format!("index '{}' dropped from '{}'", query.name, query.table),
1505 "drop",
1506 ))
1507 }
1508
1509 fn execute_drop_typed_collection(
1510 &self,
1511 raw_query: &str,
1512 name: &str,
1513 if_exists: bool,
1514 expected_model: CollectionModel,
1515 label: &str,
1516 ) -> RedDBResult<RuntimeQueryResult> {
1517 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1518 if is_system_schema_name(name) {
1519 return Err(RedDBError::Query("system schema is read-only".to_string()));
1520 }
1521 let store = self.inner.db.store();
1522 if store.get_collection(name).is_none() {
1523 if if_exists {
1524 return Ok(RuntimeQueryResult::ok_message(
1525 raw_query.to_string(),
1526 &format!("{label} '{name}' does not exist"),
1527 "drop",
1528 ));
1529 }
1530 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1531 }
1532
1533 let actual = self
1534 .inner
1535 .db
1536 .collection_contract(name)
1537 .map(|contract| contract.declared_model)
1538 .map(Ok)
1539 .unwrap_or_else(|| {
1540 polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())
1541 })?;
1542 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1543 self.drop_collection_storage(raw_query, name, label)
1544 }
1545
1546 pub fn execute_truncate(
1547 &self,
1548 raw_query: &str,
1549 query: &TruncateQuery,
1550 ) -> RedDBResult<RuntimeQueryResult> {
1551 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1552 if is_system_schema_name(&query.name) {
1553 return Err(RedDBError::Query("system schema is read-only".to_string()));
1554 }
1555
1556 let label = query
1557 .model
1558 .map(polymorphic_resolver::model_name)
1559 .unwrap_or("collection");
1560 let store = self.inner.db.store();
1561 if store.get_collection(&query.name).is_none() {
1562 if query.if_exists {
1563 return Ok(RuntimeQueryResult::ok_message(
1564 raw_query.to_string(),
1565 &format!("{label} '{}' does not exist", query.name),
1566 "truncate",
1567 ));
1568 }
1569 return Err(RedDBError::NotFound(format!(
1570 "{label} '{}' not found",
1571 query.name
1572 )));
1573 }
1574
1575 let actual =
1576 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1577 if let Some(expected) = query.model {
1578 polymorphic_resolver::ensure_model_match(expected, actual)?;
1579 }
1580
1581 if actual == CollectionModel::Queue {
1582 return self.execute_queue_command(
1583 raw_query,
1584 &QueueCommand::Purge {
1585 queue: query.name.clone(),
1586 },
1587 );
1588 }
1589
1590 let affected = self.truncate_collection_entities(&query.name)?;
1592 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1594 self.inner.db.invalidate_vector_index(&query.name);
1595 self.clear_table_planner_stats(&query.name);
1596 self.invalidate_result_cache();
1597
1598 Ok(RuntimeQueryResult::ok_message(
1599 raw_query.to_string(),
1600 &format!(
1601 "{affected} entities truncated from {label} '{}'",
1602 query.name
1603 ),
1604 "truncate",
1605 ))
1606 }
1607
1608 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1609 let store = self.inner.db.store();
1610 let Some(manager) = store.get_collection(name) else {
1611 return Ok(0);
1612 };
1613 let entities = manager.query_all(|_| true);
1614 if entities.is_empty() {
1615 return Ok(0);
1616 }
1617
1618 for entity in &entities {
1619 let fields = entity_index_fields(&entity.data);
1620 self.inner
1621 .index_store
1622 .index_entity_delete(name, entity.id, &fields)
1623 .map_err(RedDBError::Internal)?;
1624 }
1625
1626 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1627 let deleted_ids = store
1628 .delete_batch(name, &ids)
1629 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1630 for id in &deleted_ids {
1631 store.context_index().remove_entity(*id);
1632 }
1633 Ok(deleted_ids.len() as u64)
1634 }
1635
1636 fn drop_collection_storage(
1637 &self,
1638 raw_query: &str,
1639 name: &str,
1640 label: &str,
1641 ) -> RedDBResult<RuntimeQueryResult> {
1642 let store = self.inner.db.store();
1643
1644 let final_count = store
1647 .get_collection(name)
1648 .map(|manager| manager.query_all(|_| true).len() as u64)
1649 .unwrap_or(0);
1650 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1651 self,
1652 name,
1653 final_count,
1654 )?;
1655
1656 let orphaned_indices: Vec<String> = self
1657 .inner
1658 .index_store
1659 .list_indices(name)
1660 .into_iter()
1661 .map(|index| index.name)
1662 .collect();
1663 for index_name in &orphaned_indices {
1664 self.inner.index_store.drop_index(index_name, name);
1665 }
1666
1667 store
1668 .drop_collection(name)
1669 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1670 self.inner.db.invalidate_vector_index(name);
1671 self.inner.db.clear_collection_default_ttl_ms(name);
1672 self.inner
1673 .db
1674 .remove_collection_contract(name)
1675 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1676 self.clear_table_planner_stats(name);
1677 self.invalidate_result_cache();
1678 if let Some(store) = self.inner.auth_store.read().clone() {
1679 store.invalidate_visible_collections_cache();
1680 }
1681 self.inner
1682 .db
1683 .persist_metadata()
1684 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1685 self.schema_vocabulary_apply(
1686 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1687 collection: name.to_string(),
1688 },
1689 );
1690
1691 Ok(RuntimeQueryResult::ok_message(
1692 raw_query.to_string(),
1693 &format!("{label} '{name}' dropped"),
1694 "drop",
1695 ))
1696 }
1697}
1698
1699pub(crate) fn is_system_schema_name(name: &str) -> bool {
1700 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1701}
1702
1703fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1704 match data {
1705 EntityData::Row(row) => {
1706 if let Some(ref named) = row.named {
1707 named
1708 .iter()
1709 .map(|(key, value)| (key.clone(), value.clone()))
1710 .collect()
1711 } else if let Some(ref schema) = row.schema {
1712 schema
1713 .iter()
1714 .zip(row.columns.iter())
1715 .map(|(key, value)| (key.clone(), value.clone()))
1716 .collect()
1717 } else {
1718 Vec::new()
1719 }
1720 }
1721 EntityData::Node(node) => node
1722 .properties
1723 .iter()
1724 .map(|(key, value)| (key.clone(), value.clone()))
1725 .collect(),
1726 _ => Vec::new(),
1727 }
1728}
1729
1730fn collection_contract_from_create_table(
1731 query: &CreateTableQuery,
1732) -> RedDBResult<crate::physical::CollectionContract> {
1733 let now = current_unix_ms();
1734 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1735 .columns
1736 .iter()
1737 .map(declared_column_contract_from_ddl)
1738 .collect();
1739 if query.timestamps {
1740 declared_columns.push(crate::physical::DeclaredColumnContract {
1744 name: "created_at".to_string(),
1745 data_type: "BIGINT".to_string(),
1746 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1747 not_null: true,
1748 default: None,
1749 compress: None,
1750 unique: false,
1751 primary_key: false,
1752 enum_variants: Vec::new(),
1753 array_element: None,
1754 decimal_precision: None,
1755 });
1756 declared_columns.push(crate::physical::DeclaredColumnContract {
1757 name: "updated_at".to_string(),
1758 data_type: "BIGINT".to_string(),
1759 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1760 not_null: true,
1761 default: None,
1762 compress: None,
1763 unique: false,
1764 primary_key: false,
1765 enum_variants: Vec::new(),
1766 array_element: None,
1767 decimal_precision: None,
1768 });
1769 }
1770 Ok(crate::physical::CollectionContract {
1771 name: query.name.clone(),
1772 declared_model: crate::catalog::CollectionModel::Table,
1773 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1774 origin: crate::physical::ContractOrigin::Explicit,
1775 version: 1,
1776 created_at_unix_ms: now,
1777 updated_at_unix_ms: now,
1778 default_ttl_ms: query.default_ttl_ms,
1779 vector_dimension: None,
1780 vector_metric: None,
1781 context_index_fields: query.context_index_fields.clone(),
1782 declared_columns,
1783 table_def: Some(build_table_def_from_create_table(query)?),
1784 timestamps_enabled: query.timestamps,
1785 context_index_enabled: query.context_index_enabled
1786 || !query.context_index_fields.is_empty(),
1787 metrics_raw_retention_ms: None,
1788 metrics_rollup_policies: Vec::new(),
1789 metrics_tenant_identity: None,
1790 metrics_namespace: None,
1791 append_only: query.append_only,
1792 subscriptions: query.subscriptions.clone(),
1793 analytics_config: Vec::new(),
1794 session_key: None,
1795 session_gap_ms: None,
1796 retention_duration_ms: None,
1797 analytical_storage: None,
1798 })
1799}
1800
1801fn default_collection_contract_for_existing_table(
1802 name: &str,
1803) -> crate::physical::CollectionContract {
1804 let now = current_unix_ms();
1805 crate::physical::CollectionContract {
1806 name: name.to_string(),
1807 declared_model: crate::catalog::CollectionModel::Table,
1808 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1809 origin: crate::physical::ContractOrigin::Explicit,
1810 version: 0,
1811 created_at_unix_ms: now,
1812 updated_at_unix_ms: now,
1813 default_ttl_ms: None,
1814 vector_dimension: None,
1815 vector_metric: None,
1816 context_index_fields: Vec::new(),
1817 declared_columns: Vec::new(),
1818 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1819 timestamps_enabled: false,
1820 context_index_enabled: false,
1821 metrics_raw_retention_ms: None,
1822 metrics_rollup_policies: Vec::new(),
1823 metrics_tenant_identity: None,
1824 metrics_namespace: None,
1825 append_only: false,
1826 subscriptions: Vec::new(),
1827 analytics_config: Vec::new(),
1828 session_key: None,
1829 session_gap_ms: None,
1830 retention_duration_ms: None,
1831 analytical_storage: None,
1832 }
1833}
1834
1835fn keyed_collection_contract(
1836 name: &str,
1837 model: crate::catalog::CollectionModel,
1838 analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1839) -> crate::physical::CollectionContract {
1840 let now = current_unix_ms();
1841 crate::physical::CollectionContract {
1842 name: name.to_string(),
1843 declared_model: model,
1844 schema_mode: crate::catalog::SchemaMode::Dynamic,
1845 origin: crate::physical::ContractOrigin::Explicit,
1846 version: 1,
1847 created_at_unix_ms: now,
1848 updated_at_unix_ms: now,
1849 default_ttl_ms: None,
1850 vector_dimension: None,
1851 vector_metric: None,
1852 context_index_fields: Vec::new(),
1853 declared_columns: Vec::new(),
1854 table_def: None,
1855 timestamps_enabled: false,
1856 context_index_enabled: false,
1857 metrics_raw_retention_ms: None,
1858 metrics_rollup_policies: Vec::new(),
1859 metrics_tenant_identity: None,
1860 metrics_namespace: None,
1861 append_only: false,
1862 subscriptions: Vec::new(),
1863 analytics_config,
1864 session_key: None,
1865 session_gap_ms: None,
1866 retention_duration_ms: None,
1867 analytical_storage: None,
1868 }
1869}
1870
1871fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1872 let now = current_unix_ms();
1873 crate::physical::CollectionContract {
1874 name: query.name.clone(),
1875 declared_model: crate::catalog::CollectionModel::Metrics,
1876 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1877 origin: crate::physical::ContractOrigin::Explicit,
1878 version: 1,
1879 created_at_unix_ms: now,
1880 updated_at_unix_ms: now,
1881 default_ttl_ms: query.default_ttl_ms,
1882 vector_dimension: None,
1883 vector_metric: None,
1884 context_index_fields: Vec::new(),
1885 declared_columns: Vec::new(),
1886 table_def: None,
1887 timestamps_enabled: false,
1888 context_index_enabled: false,
1889 metrics_raw_retention_ms: query.default_ttl_ms,
1890 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1891 metrics_tenant_identity: Some(
1892 query
1893 .tenant_by
1894 .clone()
1895 .unwrap_or_else(|| "current_tenant".to_string()),
1896 ),
1897 metrics_namespace: Some("default".to_string()),
1898 append_only: true,
1899 subscriptions: Vec::new(),
1900 analytics_config: Vec::new(),
1901 session_key: None,
1902 session_gap_ms: None,
1903 retention_duration_ms: None,
1904 analytical_storage: None,
1905 }
1906}
1907
1908fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1909 let now = current_unix_ms();
1910 crate::physical::CollectionContract {
1911 name: query.name.clone(),
1912 declared_model: crate::catalog::CollectionModel::Vector,
1913 schema_mode: crate::catalog::SchemaMode::Dynamic,
1914 origin: crate::physical::ContractOrigin::Explicit,
1915 version: 1,
1916 created_at_unix_ms: now,
1917 updated_at_unix_ms: now,
1918 default_ttl_ms: None,
1919 vector_dimension: Some(query.dimension),
1920 vector_metric: Some(query.metric),
1921 context_index_fields: Vec::new(),
1922 declared_columns: Vec::new(),
1923 table_def: None,
1924 timestamps_enabled: false,
1925 context_index_enabled: false,
1926 metrics_raw_retention_ms: None,
1927 metrics_rollup_policies: Vec::new(),
1928 metrics_tenant_identity: None,
1929 metrics_namespace: None,
1930 append_only: false,
1931 subscriptions: Vec::new(),
1932 analytics_config: Vec::new(),
1933 session_key: None,
1934 session_gap_ms: None,
1935 retention_duration_ms: None,
1936 analytical_storage: None,
1937 }
1938}
1939
1940fn declared_column_contract_from_ddl(
1941 column: &CreateColumnDef,
1942) -> crate::physical::DeclaredColumnContract {
1943 crate::physical::DeclaredColumnContract {
1944 name: column.name.clone(),
1945 data_type: column.data_type.clone(),
1946 sql_type: Some(column.sql_type.clone()),
1947 not_null: column.not_null,
1948 default: column.default.clone(),
1949 compress: column.compress,
1950 unique: column.unique,
1951 primary_key: column.primary_key,
1952 enum_variants: column.enum_variants.clone(),
1953 array_element: column.array_element.clone(),
1954 decimal_precision: column.decimal_precision,
1955 }
1956}
1957
1958fn apply_alter_operations_to_contract(
1959 contract: &mut crate::physical::CollectionContract,
1960 operations: &[AlterOperation],
1961) {
1962 if contract.table_def.is_none() {
1963 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1964 }
1965 for operation in operations {
1966 match operation {
1967 AlterOperation::AddColumn(column) => {
1968 if !contract
1969 .declared_columns
1970 .iter()
1971 .any(|existing| existing.name == column.name)
1972 {
1973 contract
1974 .declared_columns
1975 .push(declared_column_contract_from_ddl(column));
1976 }
1977 if let Some(table_def) = contract.table_def.as_mut() {
1978 if table_def.get_column(&column.name).is_none() {
1979 if let Ok(column_def) = column_def_from_ddl(column) {
1980 if column.primary_key {
1981 table_def.primary_key.push(column.name.clone());
1982 table_def.constraints.push(
1983 crate::storage::schema::Constraint::new(
1984 format!("pk_{}", column.name),
1985 crate::storage::schema::ConstraintType::PrimaryKey,
1986 )
1987 .on_columns(vec![column.name.clone()]),
1988 );
1989 }
1990 if column.unique {
1991 table_def.constraints.push(
1992 crate::storage::schema::Constraint::new(
1993 format!("uniq_{}", column.name),
1994 crate::storage::schema::ConstraintType::Unique,
1995 )
1996 .on_columns(vec![column.name.clone()]),
1997 );
1998 }
1999 if column.not_null {
2000 table_def.constraints.push(
2001 crate::storage::schema::Constraint::new(
2002 format!("not_null_{}", column.name),
2003 crate::storage::schema::ConstraintType::NotNull,
2004 )
2005 .on_columns(vec![column.name.clone()]),
2006 );
2007 }
2008 table_def.columns.push(column_def);
2009 }
2010 }
2011 }
2012 }
2013 AlterOperation::DropColumn(name) => {
2014 contract
2015 .declared_columns
2016 .retain(|column| column.name != *name);
2017 if let Some(table_def) = contract.table_def.as_mut() {
2018 if let Some(index) = table_def.column_index(name) {
2019 table_def.columns.remove(index);
2020 }
2021 table_def.primary_key.retain(|column| column != name);
2022 table_def.constraints.retain(|constraint| {
2023 !constraint.columns.iter().any(|column| column == name)
2024 });
2025 table_def
2026 .indexes
2027 .retain(|index| !index.columns.iter().any(|column| column == name));
2028 }
2029 }
2030 AlterOperation::RenameColumn { from, to } => {
2031 if contract
2032 .declared_columns
2033 .iter()
2034 .any(|column| column.name == *to)
2035 {
2036 continue;
2037 }
2038 if let Some(column) = contract
2039 .declared_columns
2040 .iter_mut()
2041 .find(|column| column.name == *from)
2042 {
2043 column.name = to.clone();
2044 }
2045 if let Some(table_def) = contract.table_def.as_mut() {
2046 if let Some(column) = table_def
2047 .columns
2048 .iter_mut()
2049 .find(|column| column.name == *from)
2050 {
2051 column.name = to.clone();
2052 }
2053 for primary_key in &mut table_def.primary_key {
2054 if *primary_key == *from {
2055 *primary_key = to.clone();
2056 }
2057 }
2058 for constraint in &mut table_def.constraints {
2059 for column in &mut constraint.columns {
2060 if *column == *from {
2061 *column = to.clone();
2062 }
2063 }
2064 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2065 for column in ref_columns {
2066 if *column == *from {
2067 *column = to.clone();
2068 }
2069 }
2070 }
2071 }
2072 for index in &mut table_def.indexes {
2073 for column in &mut index.columns {
2074 if *column == *from {
2075 *column = to.clone();
2076 }
2077 }
2078 }
2079 }
2080 }
2081 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2084 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2088 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2091 AlterOperation::SetAppendOnly(on) => {
2092 contract.append_only = *on;
2093 }
2094 AlterOperation::SetVersioned(_) => {}
2097 AlterOperation::EnableEvents(subscription) => {
2098 let mut subscription = subscription.clone();
2099 subscription.source = contract.name.clone();
2100 subscription.enabled = true;
2101 if let Some(existing) = contract
2102 .subscriptions
2103 .iter_mut()
2104 .find(|existing| existing.target_queue == subscription.target_queue)
2105 {
2106 *existing = subscription;
2107 } else {
2108 contract.subscriptions.push(subscription);
2109 }
2110 }
2111 AlterOperation::DisableEvents => {
2112 for subscription in &mut contract.subscriptions {
2113 subscription.enabled = false;
2114 }
2115 }
2116 AlterOperation::AddSubscription { name, descriptor } => {
2117 let mut sub = descriptor.clone();
2118 sub.name = name.clone();
2119 sub.source = contract.name.clone();
2120 sub.enabled = true;
2121 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2122 {
2123 *existing = sub;
2124 } else {
2125 contract.subscriptions.push(sub);
2126 }
2127 }
2128 AlterOperation::DropSubscription { name } => {
2129 contract.subscriptions.retain(|s| s.name != *name);
2130 }
2131 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2136 AlterOperation::SetRetention { duration_ms } => {
2137 contract.retention_duration_ms = Some(*duration_ms);
2138 }
2139 AlterOperation::UnsetRetention => {
2140 contract.retention_duration_ms = None;
2141 }
2142 AlterOperation::AddAnalytics(views) => {
2146 for view in views {
2147 if !contract
2151 .analytics_config
2152 .iter()
2153 .any(|existing| existing.output == view.output)
2154 {
2155 contract.analytics_config.push(view.clone());
2156 }
2157 }
2158 }
2159 AlterOperation::DropAnalytics(output) => {
2160 contract
2161 .analytics_config
2162 .retain(|view| view.output != *output);
2163 }
2164 }
2165 }
2166}
2167
2168pub(crate) fn retention_timestamp_column_exists(
2173 contract: &crate::physical::CollectionContract,
2174) -> bool {
2175 if contract.timestamps_enabled {
2176 return true;
2177 }
2178 if matches!(
2179 contract.declared_model,
2180 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2181 ) {
2182 return true;
2186 }
2187 contract
2188 .declared_columns
2189 .iter()
2190 .any(|column| is_temporal_data_type(&column.data_type))
2191}
2192
2193fn is_temporal_data_type(data_type: &str) -> bool {
2194 let upper = data_type.to_ascii_uppercase();
2195 matches!(
2196 upper.as_str(),
2197 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2198 )
2199}
2200
2201fn validate_event_subscriptions(
2202 runtime: &RedDBRuntime,
2203 source: &str,
2204 subscriptions: &[crate::catalog::SubscriptionDescriptor],
2205) -> RedDBResult<()> {
2206 for subscription in subscriptions
2207 .iter()
2208 .filter(|subscription| subscription.enabled)
2209 {
2210 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2211 return Err(RedDBError::Query(
2212 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2213 ));
2214 }
2215 validate_subscription_auth(runtime, source, subscription)?;
2216 if subscription.target_queue == source
2217 || subscription_would_create_cycle(
2218 &runtime.inner.db,
2219 source,
2220 &subscription.target_queue,
2221 )
2222 {
2223 return Err(RedDBError::Query(
2224 "subscription would create cycle".to_string(),
2225 ));
2226 }
2227 audit_subscription_redact_gap(runtime, source, subscription);
2228 }
2229 Ok(())
2230}
2231
2232fn validate_subscription_auth(
2233 runtime: &RedDBRuntime,
2234 source: &str,
2235 subscription: &crate::catalog::SubscriptionDescriptor,
2236) -> RedDBResult<()> {
2237 let auth_store = match runtime.inner.auth_store.read().clone() {
2238 Some(store) => store,
2239 None => return Ok(()),
2240 };
2241 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2242 Some(identity) => identity,
2243 None => return Ok(()),
2244 };
2245 let tenant = crate::runtime::impl_core::current_tenant();
2246 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2247
2248 if auth_store.iam_authorization_enabled() {
2249 let ctx = crate::auth::policies::EvalContext {
2250 principal_tenant: tenant.clone(),
2251 current_tenant: tenant.clone(),
2252 peer_ip: None,
2253 mfa_present: false,
2254 now_ms: crate::auth::now_ms(),
2255 principal_is_admin_role: role == crate::auth::Role::Admin,
2256 principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2257 principal_is_platform_scoped: principal.tenant.is_none(),
2258 };
2259 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2260 if let Some(t) = tenant.as_deref() {
2261 source_resource = source_resource.with_tenant(t.to_string());
2262 }
2263 if !auth_store.check_policy_authz_with_role(
2264 &principal,
2265 "select",
2266 &source_resource,
2267 &ctx,
2268 role,
2269 ) {
2270 return Err(RedDBError::Query(format!(
2271 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2272 principal, source_resource.kind, source_resource.name
2273 )));
2274 }
2275
2276 let mut target_resource =
2277 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2278 if let Some(t) = tenant.as_deref() {
2279 target_resource = target_resource.with_tenant(t.to_string());
2280 }
2281 if !auth_store.check_policy_authz_with_role(
2282 &principal,
2283 "write",
2284 &target_resource,
2285 &ctx,
2286 role,
2287 ) {
2288 return Err(RedDBError::Query(format!(
2289 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2290 principal, target_resource.kind, target_resource.name
2291 )));
2292 }
2293 return Ok(());
2294 }
2295
2296 let ctx = crate::auth::privileges::AuthzContext {
2297 principal: &username,
2298 effective_role: role,
2299 tenant: tenant.as_deref(),
2300 };
2301 auth_store
2302 .check_grant(
2303 &ctx,
2304 crate::auth::privileges::Action::Select,
2305 &crate::auth::privileges::Resource::table_from_name(source),
2306 )
2307 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2308 auth_store
2309 .check_grant(
2310 &ctx,
2311 crate::auth::privileges::Action::Insert,
2312 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2313 )
2314 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2315 Ok(())
2316}
2317
2318fn audit_subscription_redact_gap(
2319 runtime: &RedDBRuntime,
2320 source: &str,
2321 subscription: &crate::catalog::SubscriptionDescriptor,
2322) {
2323 let auth_store = match runtime.inner.auth_store.read().clone() {
2324 Some(store) if store.iam_authorization_enabled() => store,
2325 _ => return,
2326 };
2327 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2328 Some(identity) => identity,
2329 None => return,
2330 };
2331 let tenant = crate::runtime::impl_core::current_tenant();
2332 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2333 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2334 if missing.is_empty() {
2335 return;
2336 }
2337
2338 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2339 tracing::warn!(
2340 target: "reddb::operator",
2341 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2342 source,
2343 subscription.target_queue,
2344 columns
2345 );
2346 let mut event = AuditEvent::builder("subscription_redact_gap")
2347 .principal(username)
2348 .source(AuditAuthSource::System)
2349 .resource(format!(
2350 "subscription:{}->{}",
2351 source, subscription.target_queue
2352 ))
2353 .outcome(Outcome::Success)
2354 .field(AuditFieldEscaper::field("source", source))
2355 .field(AuditFieldEscaper::field(
2356 "target_queue",
2357 subscription.target_queue.clone(),
2358 ))
2359 .field(AuditFieldEscaper::field(
2360 "subscription",
2361 subscription.name.clone(),
2362 ))
2363 .field(AuditFieldEscaper::field("columns", columns))
2364 .field(AuditFieldEscaper::field("role", role.as_str()));
2365 if let Some(t) = tenant {
2366 event = event.tenant(t);
2367 }
2368 runtime.inner.audit_log.record_event(event.build());
2369}
2370
2371fn subscription_redact_gap_columns(
2372 auth_store: &crate::auth::store::AuthStore,
2373 principal: &crate::auth::UserId,
2374 source: &str,
2375 subscription: &crate::catalog::SubscriptionDescriptor,
2376) -> BTreeSet<String> {
2377 let redacted: HashSet<String> = subscription
2378 .redact_fields
2379 .iter()
2380 .map(|field| field.to_ascii_lowercase())
2381 .collect();
2382 auth_store
2383 .effective_policies(principal)
2384 .iter()
2385 .flat_map(|policy| policy.statements.iter())
2386 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2387 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2388 .flat_map(|statement| statement.resources.iter())
2389 .filter_map(|resource| denied_column_for_source(resource, source))
2390 .filter(|column| !redact_covers_column(&redacted, source, column))
2391 .collect()
2392}
2393
2394fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2395 match pattern {
2396 crate::auth::policies::ActionPattern::Wildcard => true,
2397 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2398 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2399 "select".len() > prefix.len() + 1
2400 && "select".starts_with(prefix)
2401 && "select".as_bytes()[prefix.len()] == b':'
2402 }
2403 }
2404}
2405
2406fn denied_column_for_source(
2407 resource: &crate::auth::policies::ResourcePattern,
2408 source: &str,
2409) -> Option<String> {
2410 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2411 return None;
2412 };
2413 if kind != "column" {
2414 return None;
2415 }
2416 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2417 (column.table_resource_name() == source).then_some(column.column)
2418}
2419
2420fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2421 let column = column.to_ascii_lowercase();
2422 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2423 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2424}
2425
2426fn subscription_would_create_cycle(
2427 db: &crate::storage::unified::devx::RedDB,
2428 source: &str,
2429 target: &str,
2430) -> bool {
2431 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2432 for contract in db.collection_contracts() {
2433 for subscription in contract
2434 .subscriptions
2435 .into_iter()
2436 .filter(|subscription| subscription.enabled)
2437 {
2438 graph
2439 .entry(subscription.source)
2440 .or_default()
2441 .push(subscription.target_queue);
2442 }
2443 }
2444 graph
2445 .entry(source.to_string())
2446 .or_default()
2447 .push(target.to_string());
2448
2449 let mut stack = vec![target.to_string()];
2450 let mut seen = HashSet::new();
2451 while let Some(node) = stack.pop() {
2452 if node == source {
2453 return true;
2454 }
2455 if !seen.insert(node.clone()) {
2456 continue;
2457 }
2458 if let Some(next) = graph.get(&node) {
2459 stack.extend(next.iter().cloned());
2460 }
2461 }
2462 false
2463}
2464
2465pub(crate) fn ensure_event_target_queue_pub(
2466 runtime: &RedDBRuntime,
2467 queue: &str,
2468) -> RedDBResult<()> {
2469 ensure_event_target_queue(runtime, queue)
2470}
2471
2472fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2473 let store = runtime.inner.db.store();
2474 if store.get_collection(queue).is_some() {
2475 return Ok(());
2476 }
2477 store
2478 .create_collection(queue)
2479 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2480 runtime
2481 .inner
2482 .db
2483 .save_collection_contract(event_queue_collection_contract(queue))
2484 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2485 store.set_config_tree(
2486 &format!("queue.{queue}.mode"),
2487 &crate::serde_json::Value::String("fanout".to_string()),
2488 );
2489 Ok(())
2490}
2491
2492fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2493 let now = current_unix_ms();
2494 crate::physical::CollectionContract {
2495 name: queue.to_string(),
2496 declared_model: crate::catalog::CollectionModel::Queue,
2497 schema_mode: crate::catalog::SchemaMode::Dynamic,
2498 origin: crate::physical::ContractOrigin::Implicit,
2499 version: 1,
2500 created_at_unix_ms: now,
2501 updated_at_unix_ms: now,
2502 default_ttl_ms: None,
2503 vector_dimension: None,
2504 vector_metric: None,
2505 context_index_fields: Vec::new(),
2506 declared_columns: Vec::new(),
2507 table_def: None,
2508 timestamps_enabled: false,
2509 context_index_enabled: false,
2510 metrics_raw_retention_ms: None,
2511 metrics_rollup_policies: Vec::new(),
2512 metrics_tenant_identity: None,
2513 metrics_namespace: None,
2514 append_only: true,
2515 subscriptions: Vec::new(),
2516 analytics_config: Vec::new(),
2517 session_key: None,
2518 session_gap_ms: None,
2519 retention_duration_ms: None,
2520 analytical_storage: None,
2521 }
2522}
2523
2524fn build_table_def_from_create_table(
2525 query: &CreateTableQuery,
2526) -> RedDBResult<crate::storage::schema::TableDef> {
2527 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2528 for column in &query.columns {
2529 if column.primary_key {
2530 table.primary_key.push(column.name.clone());
2531 table.constraints.push(
2532 crate::storage::schema::Constraint::new(
2533 format!("pk_{}", column.name),
2534 crate::storage::schema::ConstraintType::PrimaryKey,
2535 )
2536 .on_columns(vec![column.name.clone()]),
2537 );
2538 }
2539 if column.unique {
2540 table.constraints.push(
2541 crate::storage::schema::Constraint::new(
2542 format!("uniq_{}", column.name),
2543 crate::storage::schema::ConstraintType::Unique,
2544 )
2545 .on_columns(vec![column.name.clone()]),
2546 );
2547 }
2548 if column.not_null {
2549 table.constraints.push(
2550 crate::storage::schema::Constraint::new(
2551 format!("not_null_{}", column.name),
2552 crate::storage::schema::ConstraintType::NotNull,
2553 )
2554 .on_columns(vec![column.name.clone()]),
2555 );
2556 }
2557 table.columns.push(column_def_from_ddl(column)?);
2558 }
2559 if query.timestamps {
2564 table.columns.push(
2565 crate::storage::schema::ColumnDef::new(
2566 "created_at".to_string(),
2567 crate::storage::schema::DataType::UnsignedInteger,
2568 )
2569 .not_null(),
2570 );
2571 table.columns.push(
2572 crate::storage::schema::ColumnDef::new(
2573 "updated_at".to_string(),
2574 crate::storage::schema::DataType::UnsignedInteger,
2575 )
2576 .not_null(),
2577 );
2578 table.constraints.push(
2579 crate::storage::schema::Constraint::new(
2580 "not_null_created_at".to_string(),
2581 crate::storage::schema::ConstraintType::NotNull,
2582 )
2583 .on_columns(vec!["created_at".to_string()]),
2584 );
2585 table.constraints.push(
2586 crate::storage::schema::Constraint::new(
2587 "not_null_updated_at".to_string(),
2588 crate::storage::schema::ConstraintType::NotNull,
2589 )
2590 .on_columns(vec!["updated_at".to_string()]),
2591 );
2592 }
2593 table
2594 .validate()
2595 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2596 Ok(table)
2597}
2598
2599fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2600 let data_type = resolve_declared_data_type(&column.data_type)
2601 .map_err(|err| RedDBError::Query(err.to_string()))?;
2602 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2603 if column.not_null {
2604 column_def = column_def.not_null();
2605 }
2606 if let Some(default) = &column.default {
2607 column_def = column_def.with_default(default.as_bytes().to_vec());
2608 }
2609 if column.compress.unwrap_or(0) > 0 {
2610 column_def = column_def.compressed();
2611 }
2612 if !column.enum_variants.is_empty() {
2613 column_def = column_def.with_variants(column.enum_variants.clone());
2614 }
2615 if let Some(precision) = column.decimal_precision {
2616 column_def = column_def.with_precision(precision);
2617 }
2618 if let Some(element_type) = &column.array_element {
2619 column_def = column_def.with_element_type(
2620 resolve_declared_data_type(element_type)
2621 .map_err(|err| RedDBError::Query(err.to_string()))?,
2622 );
2623 }
2624 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2625 if column.unique {
2626 column_def = column_def.with_metadata("unique", "true");
2627 }
2628 if column.primary_key {
2629 column_def = column_def.with_metadata("primary_key", "true");
2630 }
2631 Ok(column_def)
2632}
2633
2634fn current_unix_ms() -> u128 {
2635 std::time::SystemTime::now()
2636 .duration_since(std::time::UNIX_EPOCH)
2637 .unwrap_or_default()
2638 .as_millis()
2639}
2640
2641#[cfg(test)]
2642mod tests {
2643 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2644 use crate::auth::store::{AuthStore, PrincipalRef};
2645 use crate::auth::UserId;
2646 use crate::auth::{AuthConfig, Role};
2647 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2648 use crate::storage::schema::Value;
2649 use crate::{RedDBOptions, RedDBRuntime};
2650 use std::sync::Arc;
2651
2652 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2653 Policy {
2654 id: id.to_string(),
2655 version: 1,
2656 tenant: None,
2657 created_at: 0,
2658 updated_at: 0,
2659 statements: vec![Statement {
2660 sid: None,
2661 effect: Effect::Allow,
2662 actions: vec![ActionPattern::Exact(action.to_string())],
2663 resources: vec![ResourcePattern::Exact {
2664 kind: "collection".to_string(),
2665 name: collection.to_string(),
2666 }],
2667 condition: None,
2668 }],
2669 }
2670 }
2671
2672 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2673 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2674 *rt.inner.auth_store.write() = Some(store.clone());
2675 store
2676 }
2677
2678 #[test]
2679 fn drop_denied_without_iam_policy() {
2680 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2681 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2682 let store = wire_auth_store(&rt);
2683 let select_only = Policy {
2685 id: "select-only".to_string(),
2686 version: 1,
2687 tenant: None,
2688 created_at: 0,
2689 updated_at: 0,
2690 statements: vec![Statement {
2691 sid: None,
2692 effect: Effect::Allow,
2693 actions: vec![ActionPattern::Exact("select".to_string())],
2694 resources: vec![ResourcePattern::Wildcard],
2695 condition: None,
2696 }],
2697 };
2698 store.put_policy_internal(select_only).unwrap();
2699 let alice = UserId::from_parts(None, "alice");
2700 store
2701 .attach_policy(PrincipalRef::User(alice), "select-only")
2702 .unwrap();
2703 set_current_auth_identity("alice".to_string(), Role::Write);
2704 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2705 clear_current_auth_identity();
2706 assert!(
2707 format!("{err}").contains("denied by IAM policy"),
2708 "got: {err}"
2709 );
2710 }
2711
2712 #[test]
2713 fn drop_allowed_with_explicit_iam_policy() {
2714 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2715 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2716 let store = wire_auth_store(&rt);
2717 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2718 store.put_policy_internal(policy).unwrap();
2719 let bob = UserId::from_parts(None, "bob");
2720 store
2721 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2722 .unwrap();
2723 set_current_auth_identity("bob".to_string(), Role::Write);
2724 rt.execute_query("DROP TABLE bar").unwrap();
2725 clear_current_auth_identity();
2726 }
2727
2728 #[test]
2729 fn drop_allowed_with_wildcard_iam_policy() {
2730 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2731 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2732 let store = wire_auth_store(&rt);
2733 let policy = Policy {
2734 id: "allow-drop-all".to_string(),
2735 version: 1,
2736 tenant: None,
2737 created_at: 0,
2738 updated_at: 0,
2739 statements: vec![Statement {
2740 sid: None,
2741 effect: Effect::Allow,
2742 actions: vec![ActionPattern::Exact("drop".to_string())],
2743 resources: vec![ResourcePattern::Wildcard],
2744 condition: None,
2745 }],
2746 };
2747 store.put_policy_internal(policy).unwrap();
2748 let carl = UserId::from_parts(None, "carl");
2749 store
2750 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2751 .unwrap();
2752 set_current_auth_identity("carl".to_string(), Role::Write);
2753 rt.execute_query("DROP TABLE baz").unwrap();
2754 clear_current_auth_identity();
2755 }
2756
2757 #[test]
2758 fn truncate_denied_without_iam_policy() {
2759 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2760 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2761 let store = wire_auth_store(&rt);
2762 store
2769 .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2770 let select_only = Policy {
2772 id: "select-only-2".to_string(),
2773 version: 1,
2774 tenant: None,
2775 created_at: 0,
2776 updated_at: 0,
2777 statements: vec![Statement {
2778 sid: None,
2779 effect: Effect::Allow,
2780 actions: vec![ActionPattern::Exact("select".to_string())],
2781 resources: vec![ResourcePattern::Wildcard],
2782 condition: None,
2783 }],
2784 };
2785 store.put_policy_internal(select_only).unwrap();
2786 let dana = UserId::from_parts(None, "dana");
2787 store
2788 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2789 .unwrap();
2790 set_current_auth_identity("dana".to_string(), Role::Write);
2791 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2792 clear_current_auth_identity();
2793 assert!(
2794 format!("{err}").contains("denied by IAM policy"),
2795 "got: {err}"
2796 );
2797 }
2798
2799 #[test]
2800 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2801 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2802 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2803 .unwrap();
2804 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2805 .unwrap();
2806 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2807 .unwrap();
2808
2809 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2810 assert_eq!(truncated.statement_type, "truncate");
2811 assert_eq!(truncated.affected_rows, 0);
2812
2813 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2814 assert!(empty.result.records.is_empty());
2815
2816 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2817 .unwrap();
2818 let selected = rt
2819 .execute_query("SELECT name FROM users WHERE id = 3")
2820 .unwrap();
2821 let name = selected.result.records[0].get("name").unwrap();
2822 assert_eq!(name, &Value::text("cy"));
2823 assert!(rt.db().collection_contract("users").is_some());
2824 assert!(rt
2825 .inner
2826 .index_store
2827 .list_indices("users")
2828 .iter()
2829 .any(|index| index.name == "idx_users_id"));
2830 }
2831
2832 #[test]
2833 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2834 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2835 rt.execute_query("CREATE QUEUE tasks").unwrap();
2836 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2837
2838 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2839 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2840
2841 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2842 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2843 assert_eq!(
2844 len.result.records[0].get("len"),
2845 Some(&Value::UnsignedInteger(0))
2846 );
2847 }
2848
2849 #[test]
2850 fn truncate_system_schema_is_read_only() {
2851 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2852 let err = rt
2853 .execute_query("TRUNCATE COLLECTION red.collections")
2854 .unwrap_err();
2855 assert!(format!("{err}").contains("system schema is read-only"));
2856 }
2857
2858 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2861 let result = rt
2862 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2863 .expect("peek queue");
2864 result
2865 .result
2866 .records
2867 .iter()
2868 .map(
2869 |record| match record.get("payload").expect("payload column") {
2870 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2871 other => panic!("expected JSON queue payload, got {other:?}"),
2872 },
2873 )
2874 .collect()
2875 }
2876
2877 #[test]
2880 fn truncate_event_enabled_table_emits_single_truncate_event() {
2881 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2882 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2883 .unwrap();
2884 rt.execute_query(
2885 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2886 )
2887 .unwrap();
2888
2889 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2891
2892 rt.execute_query("TRUNCATE TABLE users").unwrap();
2893
2894 let events = queue_payloads(&rt, "users_events");
2895 assert_eq!(
2897 events.len(),
2898 1,
2899 "expected 1 truncate event, got {}",
2900 events.len()
2901 );
2902 let ev = events[0].as_object().expect("event is object");
2903 assert_eq!(
2904 ev.get("op").and_then(crate::json::Value::as_str),
2905 Some("truncate")
2906 );
2907 assert_eq!(
2908 ev.get("collection").and_then(crate::json::Value::as_str),
2909 Some("users")
2910 );
2911 assert_eq!(
2912 ev.get("entities_count")
2913 .and_then(crate::json::Value::as_u64),
2914 Some(3)
2915 );
2916 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2917 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2918 assert!(ev
2919 .get("event_id")
2920 .and_then(crate::json::Value::as_str)
2921 .is_some_and(|s| !s.is_empty()));
2922 }
2923
2924 #[test]
2926 fn truncate_no_events_collection_emits_nothing() {
2927 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2928 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2929 .unwrap();
2930 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2931 .unwrap();
2932 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2934 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2936 assert!(rows.result.records.is_empty());
2937 }
2938
2939 #[test]
2943 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2944 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2945 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2946 .unwrap();
2947 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2948 .unwrap();
2949
2950 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2952
2953 rt.execute_query("DROP TABLE users").unwrap();
2954
2955 let events = queue_payloads(&rt, "users_events");
2957 assert_eq!(
2958 events.len(),
2959 1,
2960 "expected 1 collection_dropped event, got {}",
2961 events.len()
2962 );
2963 let ev = events[0].as_object().expect("event is object");
2964 assert_eq!(
2965 ev.get("op").and_then(crate::json::Value::as_str),
2966 Some("collection_dropped")
2967 );
2968 assert_eq!(
2969 ev.get("collection").and_then(crate::json::Value::as_str),
2970 Some("users")
2971 );
2972 assert_eq!(
2973 ev.get("final_entities_count")
2974 .and_then(crate::json::Value::as_u64),
2975 Some(2)
2976 );
2977 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2978 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2979 assert!(ev
2980 .get("event_id")
2981 .and_then(crate::json::Value::as_str)
2982 .is_some_and(|s| !s.is_empty()));
2983
2984 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2986 assert!(
2987 format!("{err}").contains("users"),
2988 "expected not-found error"
2989 );
2990 }
2991
2992 #[test]
2995 fn drop_no_events_collection_emits_nothing() {
2996 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2997 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2998 .unwrap();
2999 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
3000 .unwrap();
3001 rt.execute_query("DROP TABLE plain").unwrap();
3002 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
3004 assert!(format!("{err}").contains("plain"));
3005 }
3006
3007 #[test]
3011 fn ops_filter_insert_only_ignores_update_and_delete() {
3012 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3013 rt.execute_query(
3014 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
3015 )
3016 .unwrap();
3017 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3018 .unwrap();
3019 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3020 .unwrap();
3021 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3022
3023 let events = queue_payloads(&rt, "items_events");
3024 assert_eq!(
3026 events.len(),
3027 1,
3028 "expected 1 insert event, got {}",
3029 events.len()
3030 );
3031 assert_eq!(
3032 events[0]
3033 .as_object()
3034 .unwrap()
3035 .get("op")
3036 .and_then(crate::json::Value::as_str),
3037 Some("insert")
3038 );
3039 }
3040
3041 #[test]
3043 fn where_filter_skips_rows_that_do_not_match() {
3044 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3045 rt.execute_query(
3046 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3047 )
3048 .unwrap();
3049
3050 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3052 .unwrap();
3053 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3055 .unwrap();
3056
3057 let events = queue_payloads(&rt, "users_events");
3058 assert_eq!(
3059 events.len(),
3060 1,
3061 "expected 1 event (only active), got {}",
3062 events.len()
3063 );
3064 let ev = events[0].as_object().unwrap();
3065 assert_eq!(
3066 ev.get("op").and_then(crate::json::Value::as_str),
3067 Some("insert")
3068 );
3069 let after = ev.get("after").unwrap().as_object().unwrap();
3070 assert_eq!(
3071 after.get("status").and_then(crate::json::Value::as_str),
3072 Some("active")
3073 );
3074 }
3075
3076 #[test]
3078 fn ops_filter_and_where_filter_combined() {
3079 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3080 rt.execute_query(
3081 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3082 )
3083 .unwrap();
3084
3085 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3087 .unwrap();
3088 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3090 .unwrap();
3091 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3093 .unwrap();
3094 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3096
3097 let events = queue_payloads(&rt, "items_events");
3098 assert_eq!(
3100 events.len(),
3101 1,
3102 "expected 1 event, got {}: {events:?}",
3103 events.len()
3104 );
3105 assert_eq!(
3106 events[0]
3107 .as_object()
3108 .unwrap()
3109 .get("op")
3110 .and_then(crate::json::Value::as_str),
3111 Some("insert")
3112 );
3113 }
3114
3115 #[test]
3117 fn where_filter_on_delete_checks_before_state() {
3118 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3119 rt.execute_query(
3120 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3121 )
3122 .unwrap();
3123
3124 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3125 .unwrap();
3126
3127 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3129 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3131
3132 let events = queue_payloads(&rt, "users_events");
3133 assert_eq!(
3134 events.len(),
3135 1,
3136 "expected 1 delete event, got {}",
3137 events.len()
3138 );
3139 let ev = events[0].as_object().unwrap();
3140 assert_eq!(
3141 ev.get("op").and_then(crate::json::Value::as_str),
3142 Some("delete")
3143 );
3144 }
3145
3146 #[test]
3150 fn alter_add_column_on_event_enabled_table_succeeds() {
3151 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3152 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3153 .unwrap();
3154 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3156 .unwrap();
3157 let contract = rt.db().collection_contract("users").unwrap();
3159 assert!(
3160 contract.declared_columns.iter().any(|c| c.name == "phone"),
3161 "phone column should be in contract"
3162 );
3163 assert!(
3165 contract.subscriptions.iter().any(|s| s.enabled),
3166 "subscription should remain enabled"
3167 );
3168 }
3169
3170 #[test]
3173 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3174 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3175 rt.execute_query(
3176 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3177 )
3178 .unwrap();
3179 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3181 .unwrap();
3182 let contract = rt.db().collection_contract("items").unwrap();
3183 assert!(
3184 !contract.declared_columns.iter().any(|c| c.name == "secret"),
3185 "secret column should be removed"
3186 );
3187 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3189 .unwrap();
3190 assert!(
3192 contract.subscriptions.iter().any(|s| s.enabled),
3193 "subscription should remain enabled"
3194 );
3195 }
3196
3197 #[test]
3203 fn create_vector_marks_collection_as_turbo_baseline() {
3204 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3205 rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3206 let store = rt.db().store();
3207 assert!(
3208 crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3209 "new vector collections must be turbo-marked baseline"
3210 );
3211 assert!(
3212 rt.db().turbo_state("embeddings").is_some(),
3213 "turbo_state must materialise after CREATE VECTOR"
3214 );
3215 }
3216
3217 #[test]
3221 fn create_table_does_not_mark_turbo() {
3222 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3223 rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3224 let store = rt.db().store();
3225 assert!(
3226 !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3227 "non-vector collections must not gain the turbo marker"
3228 );
3229 assert!(rt.db().turbo_state("plain").is_none());
3230 }
3231
3232 #[test]
3235 fn create_collection_kind_vector_turbo_still_marked() {
3236 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3237 rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3238 .unwrap();
3239 let store = rt.db().store();
3240 assert!(crate::runtime::vector_turbo_kind::is_turbo(
3241 &store, "turbo_v"
3242 ));
3243 assert!(rt.db().turbo_state("turbo_v").is_some());
3244 }
3245}