1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 if let Some(policy) = &query.ai_policy {
40 validate_ai_policy_modalities(policy)?;
41 }
42 crate::reserved_fields::ensure_no_reserved_public_item_fields(
43 query.columns.iter().map(|column| column.name.as_str()),
44 &format!("table '{}'", query.name),
45 )?;
46 let exists = store.get_collection(&query.name).is_some();
48 if exists {
49 if query.if_not_exists {
50 return Ok(RuntimeQueryResult::ok_message(
51 raw_query.to_string(),
52 &format!("table '{}' already exists", query.name),
53 "create",
54 ));
55 }
56 return Err(RedDBError::Query(format!(
57 "table '{}' already exists",
58 query.name
59 )));
60 }
61
62 let contract = collection_contract_from_create_table(query)?;
65 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
66 store
68 .create_collection(&query.name)
69 .map_err(|err| RedDBError::Internal(err.to_string()))?;
70 for subscription in &contract.subscriptions {
71 ensure_event_target_queue(self, &subscription.target_queue)?;
72 }
73 if let Some(default_ttl_ms) = query.default_ttl_ms {
74 self.inner
75 .db
76 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
77 }
78 self.inner
79 .db
80 .save_collection_contract(contract)
81 .map_err(|err| RedDBError::Internal(err.to_string()))?;
82 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
83 store.set_config_tree(
84 &format!("red.collection_tenants.{}", query.name),
85 &crate::serde_json::Value::String(tenant_id),
86 );
87 }
88 self.inner
89 .db
90 .persist_metadata()
91 .map_err(|err| RedDBError::Internal(err.to_string()))?;
92 self.refresh_table_planner_stats(&query.name);
93 self.invalidate_result_cache();
94 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
97 self.schema_vocabulary_apply(
98 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
99 collection: query.name.clone(),
100 columns,
101 type_tags: Vec::new(),
102 description: None,
103 },
104 );
105 if let Some(spec) = &query.partition_by {
112 let kind_str = match spec.kind {
113 crate::storage::query::ast::PartitionKind::Range => "range",
114 crate::storage::query::ast::PartitionKind::List => "list",
115 crate::storage::query::ast::PartitionKind::Hash => "hash",
116 };
117 store.set_config_tree(
118 &format!("partition.{}.by", query.name),
119 &crate::serde_json::Value::String(kind_str.to_string()),
120 );
121 store.set_config_tree(
122 &format!("partition.{}.column", query.name),
123 &crate::serde_json::Value::String(spec.column.clone()),
124 );
125 }
126
127 if let Some(col) = &query.tenant_by {
138 store.set_config_tree(
139 &format!("tenant_tables.{}.column", query.name),
140 &crate::serde_json::Value::String(col.clone()),
141 );
142 self.register_tenant_table(&query.name, col);
143 }
144
145 let ttl_suffix = query
146 .default_ttl_ms
147 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
148 .unwrap_or_default();
149
150 let tenant_suffix = query
151 .tenant_by
152 .as_ref()
153 .map(|col| format!(" (tenant-scoped by {col})"))
154 .unwrap_or_default();
155
156 Ok(RuntimeQueryResult::ok_message(
157 raw_query.to_string(),
158 &format!(
159 "table '{}' created{}{}",
160 query.name, ttl_suffix, tenant_suffix
161 ),
162 "create",
163 ))
164 }
165
166 fn execute_create_keyed_collection(
167 &self,
168 raw_query: &str,
169 query: &CreateTableQuery,
170 ) -> RedDBResult<RuntimeQueryResult> {
171 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
172 if is_system_schema_name(&query.name) {
173 return Err(RedDBError::Query("system schema is read-only".to_string()));
174 }
175 let store = self.inner.db.store();
176 let label = polymorphic_resolver::model_name(query.collection_model);
177 if store.get_collection(&query.name).is_some() {
178 if query.if_not_exists {
179 return Ok(RuntimeQueryResult::ok_message(
180 raw_query.to_string(),
181 &format!("{label} '{}' already exists", query.name),
182 "create",
183 ));
184 }
185 return Err(RedDBError::Query(format!(
186 "{label} '{}' already exists",
187 query.name
188 )));
189 }
190
191 store
192 .create_collection(&query.name)
193 .map_err(|err| RedDBError::Internal(err.to_string()))?;
194 if query.collection_model == CollectionModel::Vault {
195 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
196 let key_scope = if query.vault_own_master_key {
197 "own"
198 } else {
199 "cluster"
200 };
201 store.set_config_tree(
202 &format!("red.vault.{}.key_scope", query.name),
203 &crate::serde_json::Value::String(key_scope.to_string()),
204 );
205 store.set_config_tree(
206 &format!("red.vault.{}.status", query.name),
207 &crate::serde_json::Value::String("sealed".to_string()),
208 );
209 }
210 if query.collection_model == CollectionModel::Metrics {
211 for spec in &query.metrics_rollup_policies {
212 let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
213 .ok_or_else(|| {
214 RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
215 })?;
216 if policy.source != "raw" {
217 return Err(RedDBError::Query(format!(
218 "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
219 spec
220 )));
221 }
222 if !matches!(
223 policy.aggregation.as_str(),
224 "avg" | "sum" | "min" | "max" | "count"
225 ) {
226 return Err(RedDBError::Query(format!(
227 "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
228 spec
229 )));
230 }
231 }
232 if let Some(raw_retention_ms) = query.default_ttl_ms {
233 self.inner
234 .db
235 .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
236 store.set_config_tree(
237 &format!("red.metrics.{}.raw_retention_ms", query.name),
238 &crate::serde_json::Value::Number(raw_retention_ms as f64),
239 );
240 }
241 let tenant_identity = query
242 .tenant_by
243 .clone()
244 .unwrap_or_else(|| "current_tenant".to_string());
245 store.set_config_tree(
246 &format!("red.metrics.{}.tenant_identity", query.name),
247 &crate::serde_json::Value::String(tenant_identity),
248 );
249 store.set_config_tree(
250 &format!("red.metrics.{}.namespace", query.name),
251 &crate::serde_json::Value::String("default".to_string()),
252 );
253 if !query.metrics_rollup_policies.is_empty() {
254 store.set_config_tree(
255 &format!("red.metrics.{}.rollup_policies", query.name),
256 &crate::serde_json::Value::Array(
257 query
258 .metrics_rollup_policies
259 .iter()
260 .cloned()
261 .map(crate::serde_json::Value::String)
262 .collect(),
263 ),
264 );
265 }
266 }
267 let contract = if query.collection_model == CollectionModel::Metrics {
268 metrics_collection_contract(query)
269 } else {
270 keyed_collection_contract(
271 &query.name,
272 query.collection_model,
273 query.analytics_config.clone(),
274 )
275 };
276 self.inner
277 .db
278 .save_collection_contract(contract)
279 .map_err(|err| RedDBError::Internal(err.to_string()))?;
280 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
281 store.set_config_tree(
282 &format!("red.collection_tenants.{}", query.name),
283 &crate::serde_json::Value::String(tenant_id),
284 );
285 }
286 self.inner
287 .db
288 .persist_metadata()
289 .map_err(|err| RedDBError::Internal(err.to_string()))?;
290 self.invalidate_result_cache();
291
292 Ok(RuntimeQueryResult::ok_message(
293 raw_query.to_string(),
294 &format!("{label} '{}' created", query.name),
295 "create",
296 ))
297 }
298
299 pub fn ensure_system_graph_with_analytics(
308 &self,
309 name: &str,
310 outputs: &[crate::catalog::AnalyticsOutput],
311 ) -> RedDBResult<()> {
312 let store = self.inner.db.store();
313 if store.get_collection(name).is_some() {
314 return Ok(());
315 }
316 let analytics_config = outputs
317 .iter()
318 .map(|output| crate::catalog::AnalyticsViewDescriptor {
319 output: *output,
320 algorithm: None,
321 resolution: None,
322 max_iterations: None,
323 tolerance: None,
324 })
325 .collect();
326 store
327 .create_collection(name)
328 .map_err(|err| RedDBError::Internal(err.to_string()))?;
329 let contract = keyed_collection_contract(
330 name,
331 crate::catalog::CollectionModel::Graph,
332 analytics_config,
333 );
334 self.inner
335 .db
336 .save_collection_contract(contract)
337 .map_err(|err| RedDBError::Internal(err.to_string()))?;
338 self.inner
339 .db
340 .persist_metadata()
341 .map_err(|err| RedDBError::Internal(err.to_string()))?;
342 self.invalidate_result_cache_process_only();
343 Ok(())
344 }
345
346 pub fn execute_create_collection(
347 &self,
348 raw_query: &str,
349 query: &CreateCollectionQuery,
350 ) -> RedDBResult<RuntimeQueryResult> {
351 let model = match query.kind.as_str() {
352 "graph" => CollectionModel::Graph,
353 "document" => CollectionModel::Document,
354 "metrics" => CollectionModel::Metrics,
355 "vector.turbo" => {
356 let dimension = query.vector_dimension.ok_or_else(|| {
357 RedDBError::Query(
358 "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
359 )
360 })?;
361 let create = CreateVectorQuery {
362 name: query.name.clone(),
363 dimension,
364 metric: query
365 .vector_metric
366 .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
367 if_not_exists: query.if_not_exists,
368 };
369 let result = self.execute_create_vector(raw_query, &create)?;
370 let store = self.inner.db.store();
377 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
378 self.inner
379 .db
380 .persist_metadata()
381 .map_err(|err| RedDBError::Internal(err.to_string()))?;
382 let _ = self.inner.db.turbo_state(&query.name);
387 return Ok(result);
388 }
389 "blockchain" => CollectionModel::Table,
394 other => {
395 return Err(RedDBError::Query(format!(
396 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
397 )));
398 }
399 };
400 let create = CreateTableQuery {
401 collection_model: model,
402 name: query.name.clone(),
403 columns: Vec::new(),
404 if_not_exists: query.if_not_exists,
405 default_ttl_ms: None,
406 metrics_rollup_policies: Vec::new(),
407 context_index_fields: Vec::new(),
408 context_index_enabled: false,
409 timestamps: false,
410 partition_by: None,
411 tenant_by: None,
412 append_only: false,
413 subscriptions: Vec::new(),
414 analytics_config: Vec::new(),
415 vault_own_master_key: false,
416 ai_policy: None,
417 };
418 let result = self.execute_create_table(raw_query, &create)?;
419 if query.kind == "blockchain" {
420 self.install_blockchain_kind(&query.name)?;
421 }
422 if !query.allowed_signers.is_empty() {
427 let actor = crate::runtime::impl_core::current_user_projected()
428 .unwrap_or_else(|| "@system/create-collection".to_string());
429 crate::runtime::signed_writes_kind::install(
430 &self.inner.db.store(),
431 &query.name,
432 &query.allowed_signers,
433 &actor,
434 );
435 }
436 Ok(result)
437 }
438
439 fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
443 use crate::runtime::blockchain_kind;
444 use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
445 use std::sync::Arc;
446
447 let store = self.inner.db.store();
448 blockchain_kind::mark_as_chain(&store, name);
449
450 let existing_tip = blockchain_kind::chain_tip(&store, name);
451 if existing_tip.height.is_some() {
452 return Ok(());
453 }
454
455 let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
456 let named: std::collections::HashMap<String, crate::storage::schema::Value> =
457 fields.into_iter().collect();
458 let entity = UnifiedEntity::new(
459 EntityId::new(0),
460 EntityKind::TableRow {
461 table: Arc::from(name),
462 row_id: 0,
463 },
464 EntityData::Row(RowData {
465 columns: Vec::new(),
466 named: Some(named),
467 schema: None,
468 }),
469 );
470 store
471 .insert_auto(name, entity)
472 .map_err(|err| RedDBError::Internal(err.to_string()))?;
473 if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
476 self.inner
477 .chain_tip_cache
478 .lock()
479 .insert(name.to_string(), tip);
480 }
481 Ok(())
482 }
483
484 pub fn execute_create_vector(
485 &self,
486 raw_query: &str,
487 query: &CreateVectorQuery,
488 ) -> RedDBResult<RuntimeQueryResult> {
489 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
490 if is_system_schema_name(&query.name) {
491 return Err(RedDBError::Query("system schema is read-only".to_string()));
492 }
493 let store = self.inner.db.store();
494 if store.get_collection(&query.name).is_some() {
495 if query.if_not_exists {
496 return Ok(RuntimeQueryResult::ok_message(
497 raw_query.to_string(),
498 &format!("vector '{}' already exists", query.name),
499 "create",
500 ));
501 }
502 return Err(RedDBError::Query(format!(
503 "vector '{}' already exists",
504 query.name
505 )));
506 }
507
508 store
509 .create_collection(&query.name)
510 .map_err(|err| RedDBError::Internal(err.to_string()))?;
511 self.inner
512 .db
513 .save_collection_contract(vector_collection_contract(query))
514 .map_err(|err| RedDBError::Internal(err.to_string()))?;
515 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
516 store.set_config_tree(
517 &format!("red.collection_tenants.{}", query.name),
518 &crate::serde_json::Value::String(tenant_id),
519 );
520 }
521 crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
530 self.inner
531 .db
532 .persist_metadata()
533 .map_err(|err| RedDBError::Internal(err.to_string()))?;
534 let _ = self.inner.db.turbo_state(&query.name);
539 self.invalidate_result_cache();
540
541 Ok(RuntimeQueryResult::ok_message(
542 raw_query.to_string(),
543 &format!("vector '{}' created", query.name),
544 "create",
545 ))
546 }
547
548 fn provision_vault_key_material(
549 &self,
550 collection: &str,
551 own_master_key: bool,
552 ) -> RedDBResult<()> {
553 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
554 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
555 })?;
556 if !auth_store.is_vault_backed() {
557 return Err(RedDBError::Query(
558 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
559 ));
560 }
561
562 if auth_store.vault_secret_key().is_none() {
563 let key = crate::auth::store::random_bytes(32);
564 auth_store
565 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
566 .map_err(|err| RedDBError::Query(err.to_string()))?;
567 }
568
569 if own_master_key {
570 let key = crate::auth::store::random_bytes(32);
571 auth_store
572 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
573 .map_err(|err| RedDBError::Query(err.to_string()))?;
574 }
575
576 Ok(())
577 }
578
579 pub fn execute_drop_table(
583 &self,
584 raw_query: &str,
585 query: &DropTableQuery,
586 ) -> RedDBResult<RuntimeQueryResult> {
587 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
588 let store = self.inner.db.store();
589
590 if is_system_schema_name(&query.name) {
591 return Err(RedDBError::Query("system schema is read-only".to_string()));
592 }
593
594 let exists = store.get_collection(&query.name).is_some();
595 if !exists {
596 if query.if_exists {
597 return Ok(RuntimeQueryResult::ok_message(
598 raw_query.to_string(),
599 &format!("table '{}' does not exist", query.name),
600 "drop",
601 ));
602 }
603 return Err(RedDBError::NotFound(format!(
604 "table '{}' not found",
605 query.name
606 )));
607 }
608 let actual =
609 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
610 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
611
612 let final_count = store
615 .get_collection(&query.name)
616 .map(|manager| manager.query_all(|_| true).len() as u64)
617 .unwrap_or(0);
618 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
619 self,
620 &query.name,
621 final_count,
622 )?;
623
624 let orphaned_indices: Vec<String> = self
625 .inner
626 .index_store
627 .list_indices(&query.name)
628 .into_iter()
629 .map(|index| index.name)
630 .collect();
631 for name in &orphaned_indices {
632 self.inner.index_store.drop_index(name, &query.name);
633 }
634
635 store
636 .drop_collection(&query.name)
637 .map_err(|err| RedDBError::Internal(err.to_string()))?;
638 self.inner.db.invalidate_vector_index(&query.name);
639 self.inner.db.clear_collection_default_ttl_ms(&query.name);
640 self.inner
641 .db
642 .remove_collection_contract(&query.name)
643 .map_err(|err| RedDBError::Internal(err.to_string()))?;
644 self.clear_table_planner_stats(&query.name);
645 self.invalidate_result_cache();
646 if let Some(store) = self.inner.auth_store.read().clone() {
650 store.invalidate_visible_collections_cache();
651 }
652 self.inner
653 .db
654 .persist_metadata()
655 .map_err(|err| RedDBError::Internal(err.to_string()))?;
656 self.schema_vocabulary_apply(
662 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
663 collection: query.name.clone(),
664 },
665 );
666
667 Ok(RuntimeQueryResult::ok_message(
668 raw_query.to_string(),
669 &format!("table '{}' dropped", query.name),
670 "drop",
671 ))
672 }
673
674 pub fn execute_drop_graph(
675 &self,
676 raw_query: &str,
677 query: &DropGraphQuery,
678 ) -> RedDBResult<RuntimeQueryResult> {
679 self.execute_drop_typed_collection(
680 raw_query,
681 &query.name,
682 query.if_exists,
683 CollectionModel::Graph,
684 "graph",
685 )
686 }
687
688 pub fn execute_drop_vector(
689 &self,
690 raw_query: &str,
691 query: &DropVectorQuery,
692 ) -> RedDBResult<RuntimeQueryResult> {
693 self.execute_drop_typed_collection(
694 raw_query,
695 &query.name,
696 query.if_exists,
697 CollectionModel::Vector,
698 "vector",
699 )
700 }
701
702 pub fn execute_drop_document(
703 &self,
704 raw_query: &str,
705 query: &DropDocumentQuery,
706 ) -> RedDBResult<RuntimeQueryResult> {
707 self.execute_drop_typed_collection(
708 raw_query,
709 &query.name,
710 query.if_exists,
711 CollectionModel::Document,
712 "document",
713 )
714 }
715
716 pub fn execute_drop_kv(
717 &self,
718 raw_query: &str,
719 query: &DropKvQuery,
720 ) -> RedDBResult<RuntimeQueryResult> {
721 let label = polymorphic_resolver::model_name(query.model);
722 self.execute_drop_typed_collection(
723 raw_query,
724 &query.name,
725 query.if_exists,
726 query.model,
727 label,
728 )
729 }
730
731 pub fn execute_drop_collection(
732 &self,
733 raw_query: &str,
734 query: &DropCollectionQuery,
735 ) -> RedDBResult<RuntimeQueryResult> {
736 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
737 if is_system_schema_name(&query.name) {
738 return Err(RedDBError::Query("system schema is read-only".to_string()));
739 }
740 let store = self.inner.db.store();
741 if store.get_collection(&query.name).is_none() {
742 if query.if_exists {
743 return Ok(RuntimeQueryResult::ok_message(
744 raw_query.to_string(),
745 &format!("collection '{}' does not exist", query.name),
746 "drop",
747 ));
748 }
749 return Err(RedDBError::NotFound(format!(
750 "collection '{}' not found",
751 query.name
752 )));
753 }
754
755 let actual =
756 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
757 if let Some(expected) = query.model {
758 polymorphic_resolver::ensure_model_match(expected, actual)?;
759 }
760
761 match actual {
762 CollectionModel::Table => self.execute_drop_table(
763 raw_query,
764 &DropTableQuery {
765 name: query.name.clone(),
766 if_exists: query.if_exists,
767 },
768 ),
769 CollectionModel::TimeSeries => self.execute_drop_timeseries(
770 raw_query,
771 &DropTimeSeriesQuery {
772 name: query.name.clone(),
773 if_exists: query.if_exists,
774 },
775 ),
776 CollectionModel::Queue => self.execute_drop_queue(
777 raw_query,
778 &DropQueueQuery {
779 name: query.name.clone(),
780 if_exists: query.if_exists,
781 },
782 ),
783 CollectionModel::Graph => self.execute_drop_graph(
784 raw_query,
785 &DropGraphQuery {
786 name: query.name.clone(),
787 if_exists: query.if_exists,
788 },
789 ),
790 CollectionModel::Vector => self.execute_drop_vector(
791 raw_query,
792 &DropVectorQuery {
793 name: query.name.clone(),
794 if_exists: query.if_exists,
795 },
796 ),
797 CollectionModel::Document => self.execute_drop_document(
798 raw_query,
799 &DropDocumentQuery {
800 name: query.name.clone(),
801 if_exists: query.if_exists,
802 },
803 ),
804 CollectionModel::Kv => self.execute_drop_kv(
805 raw_query,
806 &DropKvQuery {
807 name: query.name.clone(),
808 if_exists: query.if_exists,
809 model: CollectionModel::Kv,
810 },
811 ),
812 CollectionModel::Config => self.execute_drop_kv(
813 raw_query,
814 &DropKvQuery {
815 name: query.name.clone(),
816 if_exists: query.if_exists,
817 model: CollectionModel::Config,
818 },
819 ),
820 CollectionModel::Vault => self.execute_drop_kv(
821 raw_query,
822 &DropKvQuery {
823 name: query.name.clone(),
824 if_exists: query.if_exists,
825 model: CollectionModel::Vault,
826 },
827 ),
828 CollectionModel::Hll => self.execute_probabilistic_command(
829 raw_query,
830 &ProbabilisticCommand::DropHll {
831 name: query.name.clone(),
832 if_exists: query.if_exists,
833 },
834 ),
835 CollectionModel::Sketch => self.execute_probabilistic_command(
836 raw_query,
837 &ProbabilisticCommand::DropSketch {
838 name: query.name.clone(),
839 if_exists: query.if_exists,
840 },
841 ),
842 CollectionModel::Filter => self.execute_probabilistic_command(
843 raw_query,
844 &ProbabilisticCommand::DropFilter {
845 name: query.name.clone(),
846 if_exists: query.if_exists,
847 },
848 ),
849 CollectionModel::Metrics => self.execute_drop_typed_collection(
850 raw_query,
851 &query.name,
852 query.if_exists,
853 CollectionModel::Metrics,
854 "metrics",
855 ),
856 CollectionModel::Mixed => self.execute_drop_typed_collection(
857 raw_query,
858 &query.name,
859 query.if_exists,
860 CollectionModel::Mixed,
861 "collection",
862 ),
863 }
864 }
865
866 fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
872 let is_graph = self
873 .inner
874 .db
875 .collection_contract(name)
876 .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
877 .unwrap_or(false);
878 if !is_graph {
879 return Err(RedDBError::Query(format!(
880 "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
881 )));
882 }
883 Ok(())
884 }
885
886 pub fn execute_alter_table(
892 &self,
893 raw_query: &str,
894 query: &AlterTableQuery,
895 ) -> RedDBResult<RuntimeQueryResult> {
896 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
897 let store = self.inner.db.store();
898
899 if store.get_collection(&query.name).is_none() {
901 return Err(RedDBError::NotFound(format!(
902 "table '{}' not found",
903 query.name
904 )));
905 }
906
907 let mut messages = Vec::new();
908
909 let fields_added: Vec<String> = query
911 .operations
912 .iter()
913 .filter_map(|op| {
914 if let AlterOperation::AddColumn(col) = op {
915 Some(col.name.clone())
916 } else {
917 None
918 }
919 })
920 .collect();
921 let fields_removed: Vec<String> = query
922 .operations
923 .iter()
924 .filter_map(|op| {
925 if let AlterOperation::DropColumn(name) = op {
926 Some(name.clone())
927 } else {
928 None
929 }
930 })
931 .collect();
932
933 for op in &query.operations {
934 match op {
935 AlterOperation::AddColumn(col) => {
936 messages.push(format!("column '{}' added", col.name));
938 }
939 AlterOperation::DropColumn(name) => {
940 messages.push(format!("column '{}' dropped", name));
941 }
942 AlterOperation::RenameColumn { from, to } => {
943 messages.push(format!("column '{}' renamed to '{}'", from, to));
944 }
945 AlterOperation::AttachPartition { child, bound } => {
946 store.set_config_tree(
950 &format!("partition.{}.children.{}", query.name, child),
951 &crate::serde_json::Value::String(bound.clone()),
952 );
953 messages.push(format!(
954 "partition '{child}' attached to '{}' ({bound})",
955 query.name
956 ));
957 }
958 AlterOperation::DetachPartition { child } => {
959 store.set_config_tree(
960 &format!("partition.{}.children.{}", query.name, child),
961 &crate::serde_json::Value::Null,
962 );
963 messages.push(format!(
964 "partition '{child}' detached from '{}'",
965 query.name
966 ));
967 }
968 AlterOperation::EnableRowLevelSecurity => {
969 self.inner
970 .rls_enabled_tables
971 .write()
972 .insert(query.name.clone());
973 store.set_config_tree(
975 &format!("rls.enabled.{}", query.name),
976 &crate::serde_json::Value::Bool(true),
977 );
978 self.invalidate_plan_cache();
979 messages.push(format!("row level security enabled on '{}'", query.name));
980 }
981 AlterOperation::DisableRowLevelSecurity => {
982 self.inner.rls_enabled_tables.write().remove(&query.name);
983 store.set_config_tree(
984 &format!("rls.enabled.{}", query.name),
985 &crate::serde_json::Value::Null,
986 );
987 self.invalidate_plan_cache();
988 messages.push(format!("row level security disabled on '{}'", query.name));
989 }
990 AlterOperation::EnableTenancy { column } => {
992 store.set_config_tree(
993 &format!("tenant_tables.{}.column", query.name),
994 &crate::serde_json::Value::String(column.clone()),
995 );
996 self.register_tenant_table(&query.name, column);
997 self.invalidate_plan_cache();
998 messages.push(format!(
999 "tenancy enabled on '{}' by column '{column}'",
1000 query.name
1001 ));
1002 }
1003 AlterOperation::DisableTenancy => {
1004 store.set_config_tree(
1005 &format!("tenant_tables.{}.column", query.name),
1006 &crate::serde_json::Value::Null,
1007 );
1008 self.unregister_tenant_table(&query.name);
1009 self.invalidate_plan_cache();
1010 messages.push(format!("tenancy disabled on '{}'", query.name));
1011 }
1012 AlterOperation::SetAppendOnly(on) => {
1013 messages.push(format!(
1018 "append_only {} on '{}'",
1019 if *on { "enabled" } else { "disabled" },
1020 query.name
1021 ));
1022 }
1023 AlterOperation::SetVersioned(on) => {
1024 self.vcs_set_versioned(&query.name, *on)?;
1031 messages.push(format!(
1032 "versioned {} on '{}'",
1033 if *on { "enabled" } else { "disabled" },
1034 query.name
1035 ));
1036 }
1037 AlterOperation::EnableEvents(subscription) => {
1038 let mut subscription = subscription.clone();
1039 subscription.source = query.name.clone();
1040 validate_event_subscriptions(
1041 self,
1042 &query.name,
1043 std::slice::from_ref(&subscription),
1044 )?;
1045 ensure_event_target_queue(self, &subscription.target_queue)?;
1046 messages.push(format!(
1047 "events enabled on '{}' to '{}'",
1048 query.name, subscription.target_queue
1049 ));
1050 }
1051 AlterOperation::DisableEvents => {
1052 messages.push(format!("events disabled on '{}'", query.name));
1053 }
1054 AlterOperation::AddSubscription { name, descriptor } => {
1055 let mut sub = descriptor.clone();
1056 sub.name = name.clone();
1057 sub.source = query.name.clone();
1058 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1059 ensure_event_target_queue(self, &sub.target_queue)?;
1060 messages.push(format!(
1061 "subscription '{}' added on '{}' to '{}'",
1062 name, query.name, sub.target_queue
1063 ));
1064 }
1065 AlterOperation::DropSubscription { name } => {
1066 messages.push(format!(
1067 "subscription '{}' dropped on '{}'",
1068 name, query.name
1069 ));
1070 }
1071 AlterOperation::AddSigner { pubkey } => {
1072 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1079 return Err(RedDBError::Query(format!(
1080 "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1081 recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1082 query.name
1083 )));
1084 }
1085 let actor = crate::runtime::impl_core::current_user_projected()
1086 .unwrap_or_else(|| "@system/alter".to_string());
1087 let changed = crate::runtime::signed_writes_kind::add_signer(
1088 &store,
1089 &query.name,
1090 *pubkey,
1091 &actor,
1092 );
1093 messages.push(format!(
1094 "signer {} on '{}'",
1095 if changed { "added" } else { "already present" },
1096 query.name
1097 ));
1098 }
1099 AlterOperation::RevokeSigner { pubkey } => {
1100 if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1101 return Err(RedDBError::Query(format!(
1102 "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1103 query.name
1104 )));
1105 }
1106 let actor = crate::runtime::impl_core::current_user_projected()
1107 .unwrap_or_else(|| "@system/alter".to_string());
1108 let changed = crate::runtime::signed_writes_kind::revoke_signer(
1109 &store,
1110 &query.name,
1111 pubkey,
1112 &actor,
1113 );
1114 messages.push(format!(
1115 "signer {} on '{}'",
1116 if changed {
1117 "revoked"
1118 } else {
1119 "already revoked"
1120 },
1121 query.name
1122 ));
1123 }
1124 AlterOperation::SetRetention { duration_ms } => {
1125 let existing = self.inner.db.collection_contract(&query.name);
1131 let has_ts_column = existing
1132 .as_ref()
1133 .map(retention_timestamp_column_exists)
1134 .unwrap_or(false);
1135 if !has_ts_column {
1136 return Err(RedDBError::Query(format!(
1137 "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1138 column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1139 or enable WITH timestamps = true before setting a \
1140 retention policy",
1141 query.name
1142 )));
1143 }
1144 messages.push(format!(
1145 "retention set to {duration_ms} ms on '{}'",
1146 query.name
1147 ));
1148 }
1149 AlterOperation::UnsetRetention => {
1150 messages.push(format!("retention cleared on '{}'", query.name));
1151 }
1152 AlterOperation::AddAnalytics(views) => {
1153 self.require_graph_for_analytics(&query.name)?;
1158 let existing = self.inner.db.collection_contract(&query.name);
1159 let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1160 existing
1161 .as_ref()
1162 .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1163 .unwrap_or_default();
1164 for view in views {
1165 if enabled.contains(&view.output) {
1166 messages.push(format!(
1169 "analytics '{}' already enabled on '{}'",
1170 view.output.as_str(),
1171 query.name
1172 ));
1173 } else {
1174 messages.push(format!(
1175 "analytics '{}' enabled on '{}'",
1176 view.output.as_str(),
1177 query.name
1178 ));
1179 }
1180 }
1181 }
1182 AlterOperation::DropAnalytics(output) => {
1183 self.require_graph_for_analytics(&query.name)?;
1184 let enabled = self
1185 .inner
1186 .db
1187 .collection_contract(&query.name)
1188 .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1189 .unwrap_or(false);
1190 if !enabled {
1191 return Err(RedDBError::Query(format!(
1194 "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1195 output.as_str(),
1196 query.name
1197 )));
1198 }
1199 messages.push(format!(
1200 "analytics '{}' disabled on '{}'",
1201 output.as_str(),
1202 query.name
1203 ));
1204 }
1205 }
1206 }
1207
1208 let mut contract = self
1209 .inner
1210 .db
1211 .collection_contract(&query.name)
1212 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1213 apply_alter_operations_to_contract(&mut contract, &query.operations);
1214 contract.version = contract.version.saturating_add(1);
1215 contract.updated_at_unix_ms = current_unix_ms();
1216 self.inner
1217 .db
1218 .save_collection_contract(contract)
1219 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1220 if !fields_added.is_empty() || !fields_removed.is_empty() {
1224 let sub_names: Vec<String> = self
1225 .inner
1226 .db
1227 .collection_contract(&query.name)
1228 .map(|c| {
1229 c.subscriptions
1230 .iter()
1231 .filter(|s| s.enabled)
1232 .map(|s| s.name.clone())
1233 .collect()
1234 })
1235 .unwrap_or_default();
1236 if !sub_names.is_empty() {
1237 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1238 collection: query.name.clone(),
1239 subscription_names: sub_names.join(", "),
1240 fields_added: fields_added.join(", "),
1241 fields_removed: fields_removed.join(", "),
1242 lsn: self.cdc_current_lsn(),
1243 }
1244 .emit_global();
1245 }
1246 }
1247
1248 self.clear_table_planner_stats(&query.name);
1249 self.invalidate_result_cache();
1250 let post_alter_columns: Vec<String> = self
1255 .inner
1256 .db
1257 .collection_contract(&query.name)
1258 .map(|contract| {
1259 contract
1260 .declared_columns
1261 .iter()
1262 .map(|col| col.name.clone())
1263 .collect()
1264 })
1265 .unwrap_or_default();
1266 self.schema_vocabulary_apply(
1267 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1268 collection: query.name.clone(),
1269 columns: post_alter_columns,
1270 type_tags: Vec::new(),
1271 description: None,
1272 },
1273 );
1274
1275 let message = if messages.is_empty() {
1276 format!("table '{}' altered (no operations)", query.name)
1277 } else {
1278 format!("table '{}' altered: {}", query.name, messages.join(", "))
1279 };
1280
1281 Ok(RuntimeQueryResult::ok_message(
1282 raw_query.to_string(),
1283 &message,
1284 "alter",
1285 ))
1286 }
1287
1288 pub fn execute_explain_alter(
1295 &self,
1296 raw_query: &str,
1297 query: &ExplainAlterQuery,
1298 ) -> RedDBResult<RuntimeQueryResult> {
1299 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1303
1304 let current_contract = self.inner.db.collection_contract(&query.target.name);
1305
1306 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1307 .as_ref()
1308 .map(|c| c.declared_columns.clone())
1309 .unwrap_or_default();
1310
1311 let diff = super::schema_diff::compute_column_diff(
1312 &query.target.name,
1313 ¤t_columns,
1314 &query.target.columns,
1315 );
1316
1317 let rendered = match query.format {
1318 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1319 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1320 };
1321
1322 let format_label = match query.format {
1323 ExplainFormat::Sql => "sql",
1324 ExplainFormat::Json => "json",
1325 };
1326
1327 let columns = vec![
1328 "table".to_string(),
1329 "format".to_string(),
1330 "diff".to_string(),
1331 ];
1332 let row = vec![
1333 ("table".to_string(), Value::text(query.target.name.clone())),
1334 ("format".to_string(), Value::text(format_label.to_string())),
1335 ("diff".to_string(), Value::text(rendered)),
1336 ];
1337
1338 Ok(RuntimeQueryResult::ok_records(
1339 raw_query.to_string(),
1340 columns,
1341 vec![row],
1342 "explain",
1343 ))
1344 }
1345
1346 pub fn execute_create_index(
1351 &self,
1352 raw_query: &str,
1353 query: &CreateIndexQuery,
1354 ) -> RedDBResult<RuntimeQueryResult> {
1355 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1356 let store = self.inner.db.store();
1357
1358 let manager = store
1360 .get_collection(&query.table)
1361 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1362
1363 let method_kind = match query.method {
1364 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1365 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1366 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1367 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1368 };
1369
1370 let entities = manager.query_all(|_| true);
1380 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1381 entities
1382 .iter()
1383 .map(|e| {
1384 let fields = match &e.data {
1385 crate::storage::EntityData::Row(row) => {
1386 if let Some(ref named) = row.named {
1387 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1388 } else if let Some(ref schema) = row.schema {
1389 schema
1393 .iter()
1394 .zip(row.columns.iter())
1395 .map(|(k, v)| (k.clone(), v.clone()))
1396 .collect()
1397 } else {
1398 Vec::new()
1399 }
1400 }
1401 crate::storage::EntityData::Node(node) => node
1402 .properties
1403 .iter()
1404 .map(|(k, v)| (k.clone(), v.clone()))
1405 .collect(),
1406 _ => Vec::new(),
1407 };
1408 (e.id, fields)
1409 })
1410 .collect();
1411
1412 let indexed_count = self
1414 .inner
1415 .index_store
1416 .create_index(
1417 &query.name,
1418 &query.table,
1419 &query.columns,
1420 method_kind,
1421 query.unique,
1422 &entity_fields,
1423 )
1424 .map_err(RedDBError::Internal)?;
1425
1426 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1427 &query.table,
1428 &entity_fields,
1429 );
1430 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1431 self.invalidate_plan_cache();
1432
1433 self.inner
1435 .index_store
1436 .register(super::index_store::RegisteredIndex {
1437 name: query.name.clone(),
1438 collection: query.table.clone(),
1439 columns: query.columns.clone(),
1440 method: method_kind,
1441 unique: query.unique,
1442 });
1443 self.persist_runtime_index_descriptor(super::index_store::RegisteredIndex {
1444 name: query.name.clone(),
1445 collection: query.table.clone(),
1446 columns: query.columns.clone(),
1447 method: method_kind,
1448 unique: query.unique,
1449 })?;
1450 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1454 collection: query.table.clone(),
1455 index: query.name.clone(),
1456 columns: query.columns.clone(),
1457 });
1458
1459 let method_str = format!("{}", query.method);
1460 let unique_str = if query.unique { "unique " } else { "" };
1461 let cols = query.columns.join(", ");
1462
1463 Ok(RuntimeQueryResult::ok_message(
1464 raw_query.to_string(),
1465 &format!(
1466 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1467 unique_str, query.name, query.table, cols, method_str, indexed_count
1468 ),
1469 "create",
1470 ))
1471 }
1472
1473 pub fn execute_drop_index(
1477 &self,
1478 raw_query: &str,
1479 query: &DropIndexQuery,
1480 ) -> RedDBResult<RuntimeQueryResult> {
1481 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1482 let store = self.inner.db.store();
1483
1484 if store.get_collection(&query.table).is_none() {
1486 if query.if_exists {
1487 return Ok(RuntimeQueryResult::ok_message(
1488 raw_query.to_string(),
1489 &format!("table '{}' does not exist", query.table),
1490 "drop",
1491 ));
1492 }
1493 return Err(RedDBError::NotFound(format!(
1494 "table '{}' not found",
1495 query.table
1496 )));
1497 }
1498
1499 self.inner.index_store.drop_index(&query.name, &query.table);
1501 self.persist_runtime_index_drop(&query.table, &query.name)?;
1502 self.invalidate_plan_cache();
1503 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1505 collection: query.table.clone(),
1506 index: query.name.clone(),
1507 });
1508
1509 Ok(RuntimeQueryResult::ok_message(
1510 raw_query.to_string(),
1511 &format!("index '{}' dropped from '{}'", query.name, query.table),
1512 "drop",
1513 ))
1514 }
1515
1516 fn execute_drop_typed_collection(
1517 &self,
1518 raw_query: &str,
1519 name: &str,
1520 if_exists: bool,
1521 expected_model: CollectionModel,
1522 label: &str,
1523 ) -> RedDBResult<RuntimeQueryResult> {
1524 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1525 if is_system_schema_name(name) {
1526 return Err(RedDBError::Query("system schema is read-only".to_string()));
1527 }
1528 let store = self.inner.db.store();
1529 if store.get_collection(name).is_none() {
1530 if if_exists {
1531 return Ok(RuntimeQueryResult::ok_message(
1532 raw_query.to_string(),
1533 &format!("{label} '{name}' does not exist"),
1534 "drop",
1535 ));
1536 }
1537 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1538 }
1539
1540 let actual = self
1541 .inner
1542 .db
1543 .collection_contract(name)
1544 .map(|contract| contract.declared_model)
1545 .map(Ok)
1546 .unwrap_or_else(|| {
1547 polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())
1548 })?;
1549 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1550 self.drop_collection_storage(raw_query, name, label)
1551 }
1552
1553 pub fn execute_truncate(
1554 &self,
1555 raw_query: &str,
1556 query: &TruncateQuery,
1557 ) -> RedDBResult<RuntimeQueryResult> {
1558 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1559 if is_system_schema_name(&query.name) {
1560 return Err(RedDBError::Query("system schema is read-only".to_string()));
1561 }
1562
1563 let label = query
1564 .model
1565 .map(polymorphic_resolver::model_name)
1566 .unwrap_or("collection");
1567 let store = self.inner.db.store();
1568 if store.get_collection(&query.name).is_none() {
1569 if query.if_exists {
1570 return Ok(RuntimeQueryResult::ok_message(
1571 raw_query.to_string(),
1572 &format!("{label} '{}' does not exist", query.name),
1573 "truncate",
1574 ));
1575 }
1576 return Err(RedDBError::NotFound(format!(
1577 "{label} '{}' not found",
1578 query.name
1579 )));
1580 }
1581
1582 let actual =
1583 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1584 if let Some(expected) = query.model {
1585 polymorphic_resolver::ensure_model_match(expected, actual)?;
1586 }
1587
1588 if actual == CollectionModel::Queue {
1589 return self.execute_queue_command(
1590 raw_query,
1591 &QueueCommand::Purge {
1592 queue: query.name.clone(),
1593 },
1594 );
1595 }
1596
1597 let affected = self.truncate_collection_entities(&query.name)?;
1599 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1601 self.inner.db.invalidate_vector_index(&query.name);
1602 self.clear_table_planner_stats(&query.name);
1603 self.invalidate_result_cache();
1604
1605 Ok(RuntimeQueryResult::ok_message(
1606 raw_query.to_string(),
1607 &format!(
1608 "{affected} entities truncated from {label} '{}'",
1609 query.name
1610 ),
1611 "truncate",
1612 ))
1613 }
1614
1615 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1616 let store = self.inner.db.store();
1617 let Some(manager) = store.get_collection(name) else {
1618 return Ok(0);
1619 };
1620 let entities = manager.query_all(|_| true);
1621 if entities.is_empty() {
1622 return Ok(0);
1623 }
1624
1625 for entity in &entities {
1626 let fields = entity_index_fields(&entity.data);
1627 self.inner
1628 .index_store
1629 .index_entity_delete(name, entity.id, &fields)
1630 .map_err(RedDBError::Internal)?;
1631 }
1632
1633 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1634 let deleted_ids = store
1635 .delete_batch(name, &ids)
1636 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1637 for id in &deleted_ids {
1638 store.context_index().remove_entity(*id);
1639 }
1640 Ok(deleted_ids.len() as u64)
1641 }
1642
1643 fn drop_collection_storage(
1644 &self,
1645 raw_query: &str,
1646 name: &str,
1647 label: &str,
1648 ) -> RedDBResult<RuntimeQueryResult> {
1649 let store = self.inner.db.store();
1650
1651 let final_count = store
1654 .get_collection(name)
1655 .map(|manager| manager.query_all(|_| true).len() as u64)
1656 .unwrap_or(0);
1657 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1658 self,
1659 name,
1660 final_count,
1661 )?;
1662
1663 let orphaned_indices: Vec<String> = self
1664 .inner
1665 .index_store
1666 .list_indices(name)
1667 .into_iter()
1668 .map(|index| index.name)
1669 .collect();
1670 for index_name in &orphaned_indices {
1671 self.inner.index_store.drop_index(index_name, name);
1672 }
1673
1674 store
1675 .drop_collection(name)
1676 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1677 self.inner.db.invalidate_vector_index(name);
1678 self.inner.db.clear_collection_default_ttl_ms(name);
1679 self.inner
1680 .db
1681 .remove_collection_contract(name)
1682 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1683 self.clear_table_planner_stats(name);
1684 self.invalidate_result_cache();
1685 if let Some(store) = self.inner.auth_store.read().clone() {
1686 store.invalidate_visible_collections_cache();
1687 }
1688 self.inner
1689 .db
1690 .persist_metadata()
1691 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1692 self.schema_vocabulary_apply(
1693 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1694 collection: name.to_string(),
1695 },
1696 );
1697
1698 Ok(RuntimeQueryResult::ok_message(
1699 raw_query.to_string(),
1700 &format!("{label} '{name}' dropped"),
1701 "drop",
1702 ))
1703 }
1704}
1705
1706pub(crate) fn is_system_schema_name(name: &str) -> bool {
1707 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1708}
1709
1710fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1711 match data {
1712 EntityData::Row(row) => {
1713 if let Some(ref named) = row.named {
1714 named
1715 .iter()
1716 .map(|(key, value)| (key.clone(), value.clone()))
1717 .collect()
1718 } else if let Some(ref schema) = row.schema {
1719 schema
1720 .iter()
1721 .zip(row.columns.iter())
1722 .map(|(key, value)| (key.clone(), value.clone()))
1723 .collect()
1724 } else {
1725 Vec::new()
1726 }
1727 }
1728 EntityData::Node(node) => node
1729 .properties
1730 .iter()
1731 .map(|(key, value)| (key.clone(), value.clone()))
1732 .collect(),
1733 _ => Vec::new(),
1734 }
1735}
1736
1737fn validate_ai_policy_modalities(policy: &crate::catalog::AiPolicy) -> RedDBResult<()> {
1743 use crate::runtime::ai::provider_capabilities::{Modality, Registry};
1744 let registry = Registry::new();
1745 let check = |provider: &str, model: &str, modality: Modality| -> RedDBResult<()> {
1746 registry
1747 .validate_policy_modality(provider, model, modality)
1748 .map_err(|err| RedDBError::Query(err.to_string()))
1749 };
1750 if let Some(embed) = &policy.embed {
1751 check(&embed.provider, &embed.model, Modality::Embed)?;
1752 }
1753 if let Some(moderate) = &policy.moderate {
1754 check(&moderate.provider, &moderate.model, Modality::Moderate)?;
1755 }
1756 if let Some(vision) = &policy.vision {
1757 check(&vision.provider, &vision.model, Modality::Vision)?;
1758 }
1759 Ok(())
1760}
1761
1762fn collection_contract_from_create_table(
1763 query: &CreateTableQuery,
1764) -> RedDBResult<crate::physical::CollectionContract> {
1765 let now = current_unix_ms();
1766 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1767 .columns
1768 .iter()
1769 .map(declared_column_contract_from_ddl)
1770 .collect();
1771 if query.timestamps {
1772 declared_columns.push(crate::physical::DeclaredColumnContract {
1776 name: "created_at".to_string(),
1777 data_type: "BIGINT".to_string(),
1778 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1779 not_null: true,
1780 default: None,
1781 compress: None,
1782 unique: false,
1783 primary_key: false,
1784 enum_variants: Vec::new(),
1785 array_element: None,
1786 decimal_precision: None,
1787 });
1788 declared_columns.push(crate::physical::DeclaredColumnContract {
1789 name: "updated_at".to_string(),
1790 data_type: "BIGINT".to_string(),
1791 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1792 not_null: true,
1793 default: None,
1794 compress: None,
1795 unique: false,
1796 primary_key: false,
1797 enum_variants: Vec::new(),
1798 array_element: None,
1799 decimal_precision: None,
1800 });
1801 }
1802 Ok(crate::physical::CollectionContract {
1803 name: query.name.clone(),
1804 declared_model: crate::catalog::CollectionModel::Table,
1805 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1806 origin: crate::physical::ContractOrigin::Explicit,
1807 version: 1,
1808 created_at_unix_ms: now,
1809 updated_at_unix_ms: now,
1810 default_ttl_ms: query.default_ttl_ms,
1811 vector_dimension: None,
1812 vector_metric: None,
1813 context_index_fields: query.context_index_fields.clone(),
1814 declared_columns,
1815 table_def: Some(build_table_def_from_create_table(query)?),
1816 timestamps_enabled: query.timestamps,
1817 context_index_enabled: query.context_index_enabled
1818 || !query.context_index_fields.is_empty(),
1819 metrics_raw_retention_ms: None,
1820 metrics_rollup_policies: Vec::new(),
1821 metrics_tenant_identity: None,
1822 metrics_namespace: None,
1823 append_only: query.append_only,
1824 subscriptions: query.subscriptions.clone(),
1825 analytics_config: Vec::new(),
1826 session_key: None,
1827 session_gap_ms: None,
1828 retention_duration_ms: None,
1829 analytical_storage: None,
1830 ai_policy: query.ai_policy.clone(),
1831 })
1832}
1833
1834fn default_collection_contract_for_existing_table(
1835 name: &str,
1836) -> crate::physical::CollectionContract {
1837 let now = current_unix_ms();
1838 crate::physical::CollectionContract {
1839 name: name.to_string(),
1840 declared_model: crate::catalog::CollectionModel::Table,
1841 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1842 origin: crate::physical::ContractOrigin::Explicit,
1843 version: 0,
1844 created_at_unix_ms: now,
1845 updated_at_unix_ms: now,
1846 default_ttl_ms: None,
1847 vector_dimension: None,
1848 vector_metric: None,
1849 context_index_fields: Vec::new(),
1850 declared_columns: Vec::new(),
1851 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1852 timestamps_enabled: false,
1853 context_index_enabled: false,
1854 metrics_raw_retention_ms: None,
1855 metrics_rollup_policies: Vec::new(),
1856 metrics_tenant_identity: None,
1857 metrics_namespace: None,
1858 append_only: false,
1859 subscriptions: Vec::new(),
1860 analytics_config: Vec::new(),
1861 session_key: None,
1862 session_gap_ms: None,
1863 retention_duration_ms: None,
1864 analytical_storage: None,
1865
1866 ai_policy: None,
1867 }
1868}
1869
1870fn keyed_collection_contract(
1871 name: &str,
1872 model: crate::catalog::CollectionModel,
1873 analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1874) -> crate::physical::CollectionContract {
1875 let now = current_unix_ms();
1876 crate::physical::CollectionContract {
1877 name: name.to_string(),
1878 declared_model: model,
1879 schema_mode: crate::catalog::SchemaMode::Dynamic,
1880 origin: crate::physical::ContractOrigin::Explicit,
1881 version: 1,
1882 created_at_unix_ms: now,
1883 updated_at_unix_ms: now,
1884 default_ttl_ms: None,
1885 vector_dimension: None,
1886 vector_metric: None,
1887 context_index_fields: Vec::new(),
1888 declared_columns: Vec::new(),
1889 table_def: None,
1890 timestamps_enabled: false,
1891 context_index_enabled: false,
1892 metrics_raw_retention_ms: None,
1893 metrics_rollup_policies: Vec::new(),
1894 metrics_tenant_identity: None,
1895 metrics_namespace: None,
1896 append_only: false,
1897 subscriptions: Vec::new(),
1898 analytics_config,
1899 session_key: None,
1900 session_gap_ms: None,
1901 retention_duration_ms: None,
1902 analytical_storage: None,
1903
1904 ai_policy: None,
1905 }
1906}
1907
1908fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1909 let now = current_unix_ms();
1910 crate::physical::CollectionContract {
1911 name: query.name.clone(),
1912 declared_model: crate::catalog::CollectionModel::Metrics,
1913 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1914 origin: crate::physical::ContractOrigin::Explicit,
1915 version: 1,
1916 created_at_unix_ms: now,
1917 updated_at_unix_ms: now,
1918 default_ttl_ms: query.default_ttl_ms,
1919 vector_dimension: None,
1920 vector_metric: None,
1921 context_index_fields: Vec::new(),
1922 declared_columns: Vec::new(),
1923 table_def: None,
1924 timestamps_enabled: false,
1925 context_index_enabled: false,
1926 metrics_raw_retention_ms: query.default_ttl_ms,
1927 metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1928 metrics_tenant_identity: Some(
1929 query
1930 .tenant_by
1931 .clone()
1932 .unwrap_or_else(|| "current_tenant".to_string()),
1933 ),
1934 metrics_namespace: Some("default".to_string()),
1935 append_only: true,
1936 subscriptions: Vec::new(),
1937 analytics_config: Vec::new(),
1938 session_key: None,
1939 session_gap_ms: None,
1940 retention_duration_ms: None,
1941 analytical_storage: None,
1942
1943 ai_policy: None,
1944 }
1945}
1946
1947fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1948 let now = current_unix_ms();
1949 crate::physical::CollectionContract {
1950 name: query.name.clone(),
1951 declared_model: crate::catalog::CollectionModel::Vector,
1952 schema_mode: crate::catalog::SchemaMode::Dynamic,
1953 origin: crate::physical::ContractOrigin::Explicit,
1954 version: 1,
1955 created_at_unix_ms: now,
1956 updated_at_unix_ms: now,
1957 default_ttl_ms: None,
1958 vector_dimension: Some(query.dimension),
1959 vector_metric: Some(query.metric),
1960 context_index_fields: Vec::new(),
1961 declared_columns: Vec::new(),
1962 table_def: None,
1963 timestamps_enabled: false,
1964 context_index_enabled: false,
1965 metrics_raw_retention_ms: None,
1966 metrics_rollup_policies: Vec::new(),
1967 metrics_tenant_identity: None,
1968 metrics_namespace: None,
1969 append_only: false,
1970 subscriptions: Vec::new(),
1971 analytics_config: Vec::new(),
1972 session_key: None,
1973 session_gap_ms: None,
1974 retention_duration_ms: None,
1975 analytical_storage: None,
1976
1977 ai_policy: None,
1978 }
1979}
1980
1981fn declared_column_contract_from_ddl(
1982 column: &CreateColumnDef,
1983) -> crate::physical::DeclaredColumnContract {
1984 crate::physical::DeclaredColumnContract {
1985 name: column.name.clone(),
1986 data_type: column.data_type.clone(),
1987 sql_type: Some(column.sql_type.clone()),
1988 not_null: column.not_null,
1989 default: column.default.clone(),
1990 compress: column.compress,
1991 unique: column.unique,
1992 primary_key: column.primary_key,
1993 enum_variants: column.enum_variants.clone(),
1994 array_element: column.array_element.clone(),
1995 decimal_precision: column.decimal_precision,
1996 }
1997}
1998
1999fn apply_alter_operations_to_contract(
2000 contract: &mut crate::physical::CollectionContract,
2001 operations: &[AlterOperation],
2002) {
2003 if contract.table_def.is_none() {
2004 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
2005 }
2006 for operation in operations {
2007 match operation {
2008 AlterOperation::AddColumn(column) => {
2009 if !contract
2010 .declared_columns
2011 .iter()
2012 .any(|existing| existing.name == column.name)
2013 {
2014 contract
2015 .declared_columns
2016 .push(declared_column_contract_from_ddl(column));
2017 }
2018 if let Some(table_def) = contract.table_def.as_mut() {
2019 if table_def.get_column(&column.name).is_none() {
2020 if let Ok(column_def) = column_def_from_ddl(column) {
2021 if column.primary_key {
2022 table_def.primary_key.push(column.name.clone());
2023 table_def.constraints.push(
2024 crate::storage::schema::Constraint::new(
2025 format!("pk_{}", column.name),
2026 crate::storage::schema::ConstraintType::PrimaryKey,
2027 )
2028 .on_columns(vec![column.name.clone()]),
2029 );
2030 }
2031 if column.unique {
2032 table_def.constraints.push(
2033 crate::storage::schema::Constraint::new(
2034 format!("uniq_{}", column.name),
2035 crate::storage::schema::ConstraintType::Unique,
2036 )
2037 .on_columns(vec![column.name.clone()]),
2038 );
2039 }
2040 if column.not_null {
2041 table_def.constraints.push(
2042 crate::storage::schema::Constraint::new(
2043 format!("not_null_{}", column.name),
2044 crate::storage::schema::ConstraintType::NotNull,
2045 )
2046 .on_columns(vec![column.name.clone()]),
2047 );
2048 }
2049 table_def.columns.push(column_def);
2050 }
2051 }
2052 }
2053 }
2054 AlterOperation::DropColumn(name) => {
2055 contract
2056 .declared_columns
2057 .retain(|column| column.name != *name);
2058 if let Some(table_def) = contract.table_def.as_mut() {
2059 if let Some(index) = table_def.column_index(name) {
2060 table_def.columns.remove(index);
2061 }
2062 table_def.primary_key.retain(|column| column != name);
2063 table_def.constraints.retain(|constraint| {
2064 !constraint.columns.iter().any(|column| column == name)
2065 });
2066 table_def
2067 .indexes
2068 .retain(|index| !index.columns.iter().any(|column| column == name));
2069 }
2070 }
2071 AlterOperation::RenameColumn { from, to } => {
2072 if contract
2073 .declared_columns
2074 .iter()
2075 .any(|column| column.name == *to)
2076 {
2077 continue;
2078 }
2079 if let Some(column) = contract
2080 .declared_columns
2081 .iter_mut()
2082 .find(|column| column.name == *from)
2083 {
2084 column.name = to.clone();
2085 }
2086 if let Some(table_def) = contract.table_def.as_mut() {
2087 if let Some(column) = table_def
2088 .columns
2089 .iter_mut()
2090 .find(|column| column.name == *from)
2091 {
2092 column.name = to.clone();
2093 }
2094 for primary_key in &mut table_def.primary_key {
2095 if *primary_key == *from {
2096 *primary_key = to.clone();
2097 }
2098 }
2099 for constraint in &mut table_def.constraints {
2100 for column in &mut constraint.columns {
2101 if *column == *from {
2102 *column = to.clone();
2103 }
2104 }
2105 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2106 for column in ref_columns {
2107 if *column == *from {
2108 *column = to.clone();
2109 }
2110 }
2111 }
2112 }
2113 for index in &mut table_def.indexes {
2114 for column in &mut index.columns {
2115 if *column == *from {
2116 *column = to.clone();
2117 }
2118 }
2119 }
2120 }
2121 }
2122 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2125 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2129 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2132 AlterOperation::SetAppendOnly(on) => {
2133 contract.append_only = *on;
2134 }
2135 AlterOperation::SetVersioned(_) => {}
2138 AlterOperation::EnableEvents(subscription) => {
2139 let mut subscription = subscription.clone();
2140 subscription.source = contract.name.clone();
2141 subscription.enabled = true;
2142 if let Some(existing) = contract
2143 .subscriptions
2144 .iter_mut()
2145 .find(|existing| existing.target_queue == subscription.target_queue)
2146 {
2147 *existing = subscription;
2148 } else {
2149 contract.subscriptions.push(subscription);
2150 }
2151 }
2152 AlterOperation::DisableEvents => {
2153 for subscription in &mut contract.subscriptions {
2154 subscription.enabled = false;
2155 }
2156 }
2157 AlterOperation::AddSubscription { name, descriptor } => {
2158 let mut sub = descriptor.clone();
2159 sub.name = name.clone();
2160 sub.source = contract.name.clone();
2161 sub.enabled = true;
2162 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2163 {
2164 *existing = sub;
2165 } else {
2166 contract.subscriptions.push(sub);
2167 }
2168 }
2169 AlterOperation::DropSubscription { name } => {
2170 contract.subscriptions.retain(|s| s.name != *name);
2171 }
2172 AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2177 AlterOperation::SetRetention { duration_ms } => {
2178 contract.retention_duration_ms = Some(*duration_ms);
2179 }
2180 AlterOperation::UnsetRetention => {
2181 contract.retention_duration_ms = None;
2182 }
2183 AlterOperation::AddAnalytics(views) => {
2187 for view in views {
2188 if !contract
2192 .analytics_config
2193 .iter()
2194 .any(|existing| existing.output == view.output)
2195 {
2196 contract.analytics_config.push(view.clone());
2197 }
2198 }
2199 }
2200 AlterOperation::DropAnalytics(output) => {
2201 contract
2202 .analytics_config
2203 .retain(|view| view.output != *output);
2204 }
2205 }
2206 }
2207}
2208
2209pub(crate) fn retention_timestamp_column_exists(
2214 contract: &crate::physical::CollectionContract,
2215) -> bool {
2216 if contract.timestamps_enabled {
2217 return true;
2218 }
2219 if matches!(
2220 contract.declared_model,
2221 crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2222 ) {
2223 return true;
2227 }
2228 contract
2229 .declared_columns
2230 .iter()
2231 .any(|column| is_temporal_data_type(&column.data_type))
2232}
2233
2234fn is_temporal_data_type(data_type: &str) -> bool {
2235 let upper = data_type.to_ascii_uppercase();
2236 matches!(
2237 upper.as_str(),
2238 "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2239 )
2240}
2241
2242fn validate_event_subscriptions(
2243 runtime: &RedDBRuntime,
2244 source: &str,
2245 subscriptions: &[crate::catalog::SubscriptionDescriptor],
2246) -> RedDBResult<()> {
2247 for subscription in subscriptions
2248 .iter()
2249 .filter(|subscription| subscription.enabled)
2250 {
2251 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2252 return Err(RedDBError::Query(
2253 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2254 ));
2255 }
2256 validate_subscription_auth(runtime, source, subscription)?;
2257 if subscription.target_queue == source
2258 || subscription_would_create_cycle(
2259 &runtime.inner.db,
2260 source,
2261 &subscription.target_queue,
2262 )
2263 {
2264 return Err(RedDBError::Query(
2265 "subscription would create cycle".to_string(),
2266 ));
2267 }
2268 audit_subscription_redact_gap(runtime, source, subscription);
2269 }
2270 Ok(())
2271}
2272
2273fn validate_subscription_auth(
2274 runtime: &RedDBRuntime,
2275 source: &str,
2276 subscription: &crate::catalog::SubscriptionDescriptor,
2277) -> RedDBResult<()> {
2278 let auth_store = match runtime.inner.auth_store.read().clone() {
2279 Some(store) => store,
2280 None => return Ok(()),
2281 };
2282 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2283 Some(identity) => identity,
2284 None => return Ok(()),
2285 };
2286 let tenant = crate::runtime::impl_core::current_tenant();
2287 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2288
2289 if auth_store.iam_authorization_enabled() {
2290 let ctx = crate::auth::policies::EvalContext {
2291 principal_tenant: tenant.clone(),
2292 current_tenant: tenant.clone(),
2293 peer_ip: None,
2294 mfa_present: false,
2295 now_ms: crate::auth::now_ms(),
2296 principal_is_admin_role: role == crate::auth::Role::Admin,
2297 principal_is_platform_scoped: principal.tenant.is_none(),
2298 };
2299 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2300 if let Some(t) = tenant.as_deref() {
2301 source_resource = source_resource.with_tenant(t.to_string());
2302 }
2303 if !auth_store.check_policy_authz_with_role(
2304 &principal,
2305 "select",
2306 &source_resource,
2307 &ctx,
2308 role,
2309 ) {
2310 return Err(RedDBError::Query(format!(
2311 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2312 principal, source_resource.kind, source_resource.name
2313 )));
2314 }
2315
2316 let mut target_resource =
2317 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2318 if let Some(t) = tenant.as_deref() {
2319 target_resource = target_resource.with_tenant(t.to_string());
2320 }
2321 if !auth_store.check_policy_authz_with_role(
2322 &principal,
2323 "write",
2324 &target_resource,
2325 &ctx,
2326 role,
2327 ) {
2328 return Err(RedDBError::Query(format!(
2329 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2330 principal, target_resource.kind, target_resource.name
2331 )));
2332 }
2333 return Ok(());
2334 }
2335
2336 let ctx = crate::auth::privileges::AuthzContext {
2337 principal: &username,
2338 effective_role: role,
2339 tenant: tenant.as_deref(),
2340 };
2341 auth_store
2342 .check_grant(
2343 &ctx,
2344 crate::auth::privileges::Action::Select,
2345 &crate::auth::privileges::Resource::table_from_name(source),
2346 )
2347 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2348 auth_store
2349 .check_grant(
2350 &ctx,
2351 crate::auth::privileges::Action::Insert,
2352 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2353 )
2354 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2355 Ok(())
2356}
2357
2358fn audit_subscription_redact_gap(
2359 runtime: &RedDBRuntime,
2360 source: &str,
2361 subscription: &crate::catalog::SubscriptionDescriptor,
2362) {
2363 let auth_store = match runtime.inner.auth_store.read().clone() {
2364 Some(store) if store.iam_authorization_enabled() => store,
2365 _ => return,
2366 };
2367 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2368 Some(identity) => identity,
2369 None => return,
2370 };
2371 let tenant = crate::runtime::impl_core::current_tenant();
2372 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2373 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2374 if missing.is_empty() {
2375 return;
2376 }
2377
2378 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2379 tracing::warn!(
2380 target: "reddb::operator",
2381 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2382 source,
2383 subscription.target_queue,
2384 columns
2385 );
2386 let mut event = AuditEvent::builder("subscription_redact_gap")
2387 .principal(username)
2388 .source(AuditAuthSource::System)
2389 .resource(format!(
2390 "subscription:{}->{}",
2391 source, subscription.target_queue
2392 ))
2393 .outcome(Outcome::Success)
2394 .field(AuditFieldEscaper::field("source", source))
2395 .field(AuditFieldEscaper::field(
2396 "target_queue",
2397 subscription.target_queue.clone(),
2398 ))
2399 .field(AuditFieldEscaper::field(
2400 "subscription",
2401 subscription.name.clone(),
2402 ))
2403 .field(AuditFieldEscaper::field("columns", columns))
2404 .field(AuditFieldEscaper::field("role", role.as_str()));
2405 if let Some(t) = tenant {
2406 event = event.tenant(t);
2407 }
2408 runtime.inner.audit_log.record_event(event.build());
2409}
2410
2411fn subscription_redact_gap_columns(
2412 auth_store: &crate::auth::store::AuthStore,
2413 principal: &crate::auth::UserId,
2414 source: &str,
2415 subscription: &crate::catalog::SubscriptionDescriptor,
2416) -> BTreeSet<String> {
2417 let redacted: HashSet<String> = subscription
2418 .redact_fields
2419 .iter()
2420 .map(|field| field.to_ascii_lowercase())
2421 .collect();
2422 auth_store
2423 .effective_policies(principal)
2424 .iter()
2425 .flat_map(|policy| policy.statements.iter())
2426 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2427 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2428 .flat_map(|statement| statement.resources.iter())
2429 .filter_map(|resource| denied_column_for_source(resource, source))
2430 .filter(|column| !redact_covers_column(&redacted, source, column))
2431 .collect()
2432}
2433
2434fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2435 match pattern {
2436 crate::auth::policies::ActionPattern::Wildcard => true,
2437 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2438 crate::auth::policies::ActionPattern::Prefix(prefix) => {
2439 "select".len() > prefix.len() + 1
2440 && "select".starts_with(prefix)
2441 && "select".as_bytes()[prefix.len()] == b':'
2442 }
2443 }
2444}
2445
2446fn denied_column_for_source(
2447 resource: &crate::auth::policies::ResourcePattern,
2448 source: &str,
2449) -> Option<String> {
2450 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2451 return None;
2452 };
2453 if kind != "column" {
2454 return None;
2455 }
2456 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2457 (column.table_resource_name() == source).then_some(column.column)
2458}
2459
2460fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2461 let column = column.to_ascii_lowercase();
2462 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2463 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2464}
2465
2466fn subscription_would_create_cycle(
2467 db: &crate::storage::unified::devx::RedDB,
2468 source: &str,
2469 target: &str,
2470) -> bool {
2471 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2472 for contract in db.collection_contracts() {
2473 for subscription in contract
2474 .subscriptions
2475 .into_iter()
2476 .filter(|subscription| subscription.enabled)
2477 {
2478 graph
2479 .entry(subscription.source)
2480 .or_default()
2481 .push(subscription.target_queue);
2482 }
2483 }
2484 graph
2485 .entry(source.to_string())
2486 .or_default()
2487 .push(target.to_string());
2488
2489 let mut stack = vec![target.to_string()];
2490 let mut seen = HashSet::new();
2491 while let Some(node) = stack.pop() {
2492 if node == source {
2493 return true;
2494 }
2495 if !seen.insert(node.clone()) {
2496 continue;
2497 }
2498 if let Some(next) = graph.get(&node) {
2499 stack.extend(next.iter().cloned());
2500 }
2501 }
2502 false
2503}
2504
2505pub(crate) fn ensure_event_target_queue_pub(
2506 runtime: &RedDBRuntime,
2507 queue: &str,
2508) -> RedDBResult<()> {
2509 ensure_event_target_queue(runtime, queue)
2510}
2511
2512fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2513 let store = runtime.inner.db.store();
2514 if store.get_collection(queue).is_some() {
2515 return Ok(());
2516 }
2517 store
2518 .create_collection(queue)
2519 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2520 runtime
2521 .inner
2522 .db
2523 .save_collection_contract(event_queue_collection_contract(queue))
2524 .map_err(|err| RedDBError::Internal(err.to_string()))?;
2525 store.set_config_tree(
2526 &format!("queue.{queue}.mode"),
2527 &crate::serde_json::Value::String("fanout".to_string()),
2528 );
2529 Ok(())
2530}
2531
2532fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2533 let now = current_unix_ms();
2534 crate::physical::CollectionContract {
2535 name: queue.to_string(),
2536 declared_model: crate::catalog::CollectionModel::Queue,
2537 schema_mode: crate::catalog::SchemaMode::Dynamic,
2538 origin: crate::physical::ContractOrigin::Implicit,
2539 version: 1,
2540 created_at_unix_ms: now,
2541 updated_at_unix_ms: now,
2542 default_ttl_ms: None,
2543 vector_dimension: None,
2544 vector_metric: None,
2545 context_index_fields: Vec::new(),
2546 declared_columns: Vec::new(),
2547 table_def: None,
2548 timestamps_enabled: false,
2549 context_index_enabled: false,
2550 metrics_raw_retention_ms: None,
2551 metrics_rollup_policies: Vec::new(),
2552 metrics_tenant_identity: None,
2553 metrics_namespace: None,
2554 append_only: true,
2555 subscriptions: Vec::new(),
2556 analytics_config: Vec::new(),
2557 session_key: None,
2558 session_gap_ms: None,
2559 retention_duration_ms: None,
2560 analytical_storage: None,
2561
2562 ai_policy: None,
2563 }
2564}
2565
2566fn build_table_def_from_create_table(
2567 query: &CreateTableQuery,
2568) -> RedDBResult<crate::storage::schema::TableDef> {
2569 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2570 for column in &query.columns {
2571 if column.primary_key {
2572 table.primary_key.push(column.name.clone());
2573 table.constraints.push(
2574 crate::storage::schema::Constraint::new(
2575 format!("pk_{}", column.name),
2576 crate::storage::schema::ConstraintType::PrimaryKey,
2577 )
2578 .on_columns(vec![column.name.clone()]),
2579 );
2580 }
2581 if column.unique {
2582 table.constraints.push(
2583 crate::storage::schema::Constraint::new(
2584 format!("uniq_{}", column.name),
2585 crate::storage::schema::ConstraintType::Unique,
2586 )
2587 .on_columns(vec![column.name.clone()]),
2588 );
2589 }
2590 if column.not_null {
2591 table.constraints.push(
2592 crate::storage::schema::Constraint::new(
2593 format!("not_null_{}", column.name),
2594 crate::storage::schema::ConstraintType::NotNull,
2595 )
2596 .on_columns(vec![column.name.clone()]),
2597 );
2598 }
2599 table.columns.push(column_def_from_ddl(column)?);
2600 }
2601 if query.timestamps {
2606 table.columns.push(
2607 crate::storage::schema::ColumnDef::new(
2608 "created_at".to_string(),
2609 crate::storage::schema::DataType::UnsignedInteger,
2610 )
2611 .not_null(),
2612 );
2613 table.columns.push(
2614 crate::storage::schema::ColumnDef::new(
2615 "updated_at".to_string(),
2616 crate::storage::schema::DataType::UnsignedInteger,
2617 )
2618 .not_null(),
2619 );
2620 table.constraints.push(
2621 crate::storage::schema::Constraint::new(
2622 "not_null_created_at".to_string(),
2623 crate::storage::schema::ConstraintType::NotNull,
2624 )
2625 .on_columns(vec!["created_at".to_string()]),
2626 );
2627 table.constraints.push(
2628 crate::storage::schema::Constraint::new(
2629 "not_null_updated_at".to_string(),
2630 crate::storage::schema::ConstraintType::NotNull,
2631 )
2632 .on_columns(vec!["updated_at".to_string()]),
2633 );
2634 }
2635 table
2636 .validate()
2637 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2638 Ok(table)
2639}
2640
2641fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2642 let data_type = resolve_declared_data_type(&column.data_type)
2643 .map_err(|err| RedDBError::Query(err.to_string()))?;
2644 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2645 if column.not_null {
2646 column_def = column_def.not_null();
2647 }
2648 if let Some(default) = &column.default {
2649 column_def = column_def.with_default(default.as_bytes().to_vec());
2650 }
2651 if column.compress.unwrap_or(0) > 0 {
2652 column_def = column_def.compressed();
2653 }
2654 if !column.enum_variants.is_empty() {
2655 column_def = column_def.with_variants(column.enum_variants.clone());
2656 }
2657 if let Some(precision) = column.decimal_precision {
2658 column_def = column_def.with_precision(precision);
2659 }
2660 if let Some(element_type) = &column.array_element {
2661 column_def = column_def.with_element_type(
2662 resolve_declared_data_type(element_type)
2663 .map_err(|err| RedDBError::Query(err.to_string()))?,
2664 );
2665 }
2666 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2667 if column.unique {
2668 column_def = column_def.with_metadata("unique", "true");
2669 }
2670 if column.primary_key {
2671 column_def = column_def.with_metadata("primary_key", "true");
2672 }
2673 Ok(column_def)
2674}
2675
2676fn current_unix_ms() -> u128 {
2677 std::time::SystemTime::now()
2678 .duration_since(std::time::UNIX_EPOCH)
2679 .unwrap_or_default()
2680 .as_millis()
2681}
2682
2683#[cfg(test)]
2684mod tests {
2685 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2686 use crate::auth::store::{AuthStore, PrincipalRef};
2687 use crate::auth::UserId;
2688 use crate::auth::{AuthConfig, Role};
2689 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2690 use crate::storage::schema::Value;
2691 use crate::{RedDBOptions, RedDBRuntime};
2692 use std::sync::Arc;
2693
2694 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2695 Policy {
2696 id: id.to_string(),
2697 version: 1,
2698 tenant: None,
2699 created_at: 0,
2700 updated_at: 0,
2701 statements: vec![Statement {
2702 sid: None,
2703 effect: Effect::Allow,
2704 actions: vec![ActionPattern::Exact(action.to_string())],
2705 resources: vec![ResourcePattern::Exact {
2706 kind: "collection".to_string(),
2707 name: collection.to_string(),
2708 }],
2709 condition: None,
2710 }],
2711 }
2712 }
2713
2714 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2715 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2716 *rt.inner.auth_store.write() = Some(store.clone());
2717 store
2718 }
2719
2720 #[test]
2721 fn drop_denied_without_iam_policy() {
2722 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2723 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2724 let store = wire_auth_store(&rt);
2725 let select_only = Policy {
2727 id: "select-only".to_string(),
2728 version: 1,
2729 tenant: None,
2730 created_at: 0,
2731 updated_at: 0,
2732 statements: vec![Statement {
2733 sid: None,
2734 effect: Effect::Allow,
2735 actions: vec![ActionPattern::Exact("select".to_string())],
2736 resources: vec![ResourcePattern::Wildcard],
2737 condition: None,
2738 }],
2739 };
2740 store.put_policy_internal(select_only).unwrap();
2741 let alice = UserId::from_parts(None, "alice");
2742 store
2743 .attach_policy(PrincipalRef::User(alice), "select-only")
2744 .unwrap();
2745 set_current_auth_identity("alice".to_string(), Role::Write);
2746 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2747 clear_current_auth_identity();
2748 assert!(
2749 format!("{err}").contains("denied by IAM policy"),
2750 "got: {err}"
2751 );
2752 }
2753
2754 #[test]
2755 fn drop_allowed_with_explicit_iam_policy() {
2756 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2757 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2758 let store = wire_auth_store(&rt);
2759 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2760 store.put_policy_internal(policy).unwrap();
2761 let bob = UserId::from_parts(None, "bob");
2762 store
2763 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2764 .unwrap();
2765 set_current_auth_identity("bob".to_string(), Role::Write);
2766 rt.execute_query("DROP TABLE bar").unwrap();
2767 clear_current_auth_identity();
2768 }
2769
2770 #[test]
2771 fn drop_allowed_with_wildcard_iam_policy() {
2772 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2773 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2774 let store = wire_auth_store(&rt);
2775 let policy = Policy {
2776 id: "allow-drop-all".to_string(),
2777 version: 1,
2778 tenant: None,
2779 created_at: 0,
2780 updated_at: 0,
2781 statements: vec![Statement {
2782 sid: None,
2783 effect: Effect::Allow,
2784 actions: vec![ActionPattern::Exact("drop".to_string())],
2785 resources: vec![ResourcePattern::Wildcard],
2786 condition: None,
2787 }],
2788 };
2789 store.put_policy_internal(policy).unwrap();
2790 let carl = UserId::from_parts(None, "carl");
2791 store
2792 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2793 .unwrap();
2794 set_current_auth_identity("carl".to_string(), Role::Write);
2795 rt.execute_query("DROP TABLE baz").unwrap();
2796 clear_current_auth_identity();
2797 }
2798
2799 #[test]
2800 fn truncate_denied_without_iam_policy() {
2801 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2802 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2803 let store = wire_auth_store(&rt);
2804 store
2811 .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2812 let select_only = Policy {
2814 id: "select-only-2".to_string(),
2815 version: 1,
2816 tenant: None,
2817 created_at: 0,
2818 updated_at: 0,
2819 statements: vec![Statement {
2820 sid: None,
2821 effect: Effect::Allow,
2822 actions: vec![ActionPattern::Exact("select".to_string())],
2823 resources: vec![ResourcePattern::Wildcard],
2824 condition: None,
2825 }],
2826 };
2827 store.put_policy_internal(select_only).unwrap();
2828 let dana = UserId::from_parts(None, "dana");
2829 store
2830 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2831 .unwrap();
2832 set_current_auth_identity("dana".to_string(), Role::Write);
2833 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2834 clear_current_auth_identity();
2835 assert!(
2836 format!("{err}").contains("denied by IAM policy"),
2837 "got: {err}"
2838 );
2839 }
2840
2841 #[test]
2842 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2843 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2844 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2845 .unwrap();
2846 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2847 .unwrap();
2848 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2849 .unwrap();
2850
2851 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2852 assert_eq!(truncated.statement_type, "truncate");
2853 assert_eq!(truncated.affected_rows, 0);
2854
2855 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2856 assert!(empty.result.records.is_empty());
2857
2858 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2859 .unwrap();
2860 let selected = rt
2861 .execute_query("SELECT name FROM users WHERE id = 3")
2862 .unwrap();
2863 let name = selected.result.records[0].get("name").unwrap();
2864 assert_eq!(name, &Value::text("cy"));
2865 assert!(rt.db().collection_contract("users").is_some());
2866 assert!(rt
2867 .inner
2868 .index_store
2869 .list_indices("users")
2870 .iter()
2871 .any(|index| index.name == "idx_users_id"));
2872 }
2873
2874 #[test]
2875 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2876 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2877 rt.execute_query("CREATE QUEUE tasks").unwrap();
2878 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2879
2880 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2881 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2882
2883 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2884 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2885 assert_eq!(
2886 len.result.records[0].get("len"),
2887 Some(&Value::UnsignedInteger(0))
2888 );
2889 }
2890
2891 #[test]
2892 fn truncate_system_schema_is_read_only() {
2893 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2894 let err = rt
2895 .execute_query("TRUNCATE COLLECTION red.collections")
2896 .unwrap_err();
2897 assert!(format!("{err}").contains("system schema is read-only"));
2898 }
2899
2900 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2903 let result = rt
2904 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2905 .expect("peek queue");
2906 result
2907 .result
2908 .records
2909 .iter()
2910 .map(
2911 |record| match record.get("payload").expect("payload column") {
2912 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2913 other => panic!("expected JSON queue payload, got {other:?}"),
2914 },
2915 )
2916 .collect()
2917 }
2918
2919 #[test]
2922 fn truncate_event_enabled_table_emits_single_truncate_event() {
2923 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2924 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2925 .unwrap();
2926 rt.execute_query(
2927 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2928 )
2929 .unwrap();
2930
2931 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2933
2934 rt.execute_query("TRUNCATE TABLE users").unwrap();
2935
2936 let events = queue_payloads(&rt, "users_events");
2937 assert_eq!(
2939 events.len(),
2940 1,
2941 "expected 1 truncate event, got {}",
2942 events.len()
2943 );
2944 let ev = events[0].as_object().expect("event is object");
2945 assert_eq!(
2946 ev.get("op").and_then(crate::json::Value::as_str),
2947 Some("truncate")
2948 );
2949 assert_eq!(
2950 ev.get("collection").and_then(crate::json::Value::as_str),
2951 Some("users")
2952 );
2953 assert_eq!(
2954 ev.get("entities_count")
2955 .and_then(crate::json::Value::as_u64),
2956 Some(3)
2957 );
2958 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2959 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2960 assert!(ev
2961 .get("event_id")
2962 .and_then(crate::json::Value::as_str)
2963 .is_some_and(|s| !s.is_empty()));
2964 }
2965
2966 #[test]
2968 fn truncate_no_events_collection_emits_nothing() {
2969 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2970 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2971 .unwrap();
2972 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2973 .unwrap();
2974 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2976 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2978 assert!(rows.result.records.is_empty());
2979 }
2980
2981 #[test]
2985 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2986 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2987 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2988 .unwrap();
2989 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2990 .unwrap();
2991
2992 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2994
2995 rt.execute_query("DROP TABLE users").unwrap();
2996
2997 let events = queue_payloads(&rt, "users_events");
2999 assert_eq!(
3000 events.len(),
3001 1,
3002 "expected 1 collection_dropped event, got {}",
3003 events.len()
3004 );
3005 let ev = events[0].as_object().expect("event is object");
3006 assert_eq!(
3007 ev.get("op").and_then(crate::json::Value::as_str),
3008 Some("collection_dropped")
3009 );
3010 assert_eq!(
3011 ev.get("collection").and_then(crate::json::Value::as_str),
3012 Some("users")
3013 );
3014 assert_eq!(
3015 ev.get("final_entities_count")
3016 .and_then(crate::json::Value::as_u64),
3017 Some(2)
3018 );
3019 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
3020 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
3021 assert!(ev
3022 .get("event_id")
3023 .and_then(crate::json::Value::as_str)
3024 .is_some_and(|s| !s.is_empty()));
3025
3026 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
3028 assert!(
3029 format!("{err}").contains("users"),
3030 "expected not-found error"
3031 );
3032 }
3033
3034 #[test]
3037 fn drop_no_events_collection_emits_nothing() {
3038 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3039 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
3040 .unwrap();
3041 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
3042 .unwrap();
3043 rt.execute_query("DROP TABLE plain").unwrap();
3044 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
3046 assert!(format!("{err}").contains("plain"));
3047 }
3048
3049 #[test]
3053 fn ops_filter_insert_only_ignores_update_and_delete() {
3054 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3055 rt.execute_query(
3056 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
3057 )
3058 .unwrap();
3059 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3060 .unwrap();
3061 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3062 .unwrap();
3063 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3064
3065 let events = queue_payloads(&rt, "items_events");
3066 assert_eq!(
3068 events.len(),
3069 1,
3070 "expected 1 insert event, got {}",
3071 events.len()
3072 );
3073 assert_eq!(
3074 events[0]
3075 .as_object()
3076 .unwrap()
3077 .get("op")
3078 .and_then(crate::json::Value::as_str),
3079 Some("insert")
3080 );
3081 }
3082
3083 #[test]
3085 fn where_filter_skips_rows_that_do_not_match() {
3086 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3087 rt.execute_query(
3088 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3089 )
3090 .unwrap();
3091
3092 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3094 .unwrap();
3095 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3097 .unwrap();
3098
3099 let events = queue_payloads(&rt, "users_events");
3100 assert_eq!(
3101 events.len(),
3102 1,
3103 "expected 1 event (only active), got {}",
3104 events.len()
3105 );
3106 let ev = events[0].as_object().unwrap();
3107 assert_eq!(
3108 ev.get("op").and_then(crate::json::Value::as_str),
3109 Some("insert")
3110 );
3111 let after = ev.get("after").unwrap().as_object().unwrap();
3112 assert_eq!(
3113 after.get("status").and_then(crate::json::Value::as_str),
3114 Some("active")
3115 );
3116 }
3117
3118 #[test]
3120 fn ops_filter_and_where_filter_combined() {
3121 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3122 rt.execute_query(
3123 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3124 )
3125 .unwrap();
3126
3127 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3129 .unwrap();
3130 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3132 .unwrap();
3133 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3135 .unwrap();
3136 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3138
3139 let events = queue_payloads(&rt, "items_events");
3140 assert_eq!(
3142 events.len(),
3143 1,
3144 "expected 1 event, got {}: {events:?}",
3145 events.len()
3146 );
3147 assert_eq!(
3148 events[0]
3149 .as_object()
3150 .unwrap()
3151 .get("op")
3152 .and_then(crate::json::Value::as_str),
3153 Some("insert")
3154 );
3155 }
3156
3157 #[test]
3159 fn where_filter_on_delete_checks_before_state() {
3160 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3161 rt.execute_query(
3162 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3163 )
3164 .unwrap();
3165
3166 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3167 .unwrap();
3168
3169 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3171 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3173
3174 let events = queue_payloads(&rt, "users_events");
3175 assert_eq!(
3176 events.len(),
3177 1,
3178 "expected 1 delete event, got {}",
3179 events.len()
3180 );
3181 let ev = events[0].as_object().unwrap();
3182 assert_eq!(
3183 ev.get("op").and_then(crate::json::Value::as_str),
3184 Some("delete")
3185 );
3186 }
3187
3188 #[test]
3192 fn alter_add_column_on_event_enabled_table_succeeds() {
3193 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3194 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3195 .unwrap();
3196 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3198 .unwrap();
3199 let contract = rt.db().collection_contract("users").unwrap();
3201 assert!(
3202 contract.declared_columns.iter().any(|c| c.name == "phone"),
3203 "phone column should be in contract"
3204 );
3205 assert!(
3207 contract.subscriptions.iter().any(|s| s.enabled),
3208 "subscription should remain enabled"
3209 );
3210 }
3211
3212 #[test]
3215 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3216 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3217 rt.execute_query(
3218 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3219 )
3220 .unwrap();
3221 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3223 .unwrap();
3224 let contract = rt.db().collection_contract("items").unwrap();
3225 assert!(
3226 !contract.declared_columns.iter().any(|c| c.name == "secret"),
3227 "secret column should be removed"
3228 );
3229 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3231 .unwrap();
3232 assert!(
3234 contract.subscriptions.iter().any(|s| s.enabled),
3235 "subscription should remain enabled"
3236 );
3237 }
3238
3239 #[test]
3245 fn create_vector_marks_collection_as_turbo_baseline() {
3246 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3247 rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3248 let store = rt.db().store();
3249 assert!(
3250 crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3251 "new vector collections must be turbo-marked baseline"
3252 );
3253 assert!(
3254 rt.db().turbo_state("embeddings").is_some(),
3255 "turbo_state must materialise after CREATE VECTOR"
3256 );
3257 }
3258
3259 #[test]
3263 fn create_table_does_not_mark_turbo() {
3264 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3265 rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3266 let store = rt.db().store();
3267 assert!(
3268 !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3269 "non-vector collections must not gain the turbo marker"
3270 );
3271 assert!(rt.db().turbo_state("plain").is_none());
3272 }
3273
3274 #[test]
3277 fn create_collection_kind_vector_turbo_still_marked() {
3278 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3279 rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3280 .unwrap();
3281 let store = rt.db().store();
3282 assert!(crate::runtime::vector_turbo_kind::is_turbo(
3283 &store, "turbo_v"
3284 ));
3285 assert!(rt.db().turbo_state("turbo_v").is_some());
3286 }
3287
3288 #[test]
3291 fn create_table_persists_and_introspects_ai_policy() {
3292 use crate::catalog::{ModerateDegradedMode, ModerateRejectAction};
3293 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3294 rt.execute_query(
3295 "CREATE TABLE posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
3296 EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
3297 MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag), \
3298 VISION (image_field = 'photo', outputs = ('caption'), provider = 'openai', model = 'gpt-4o') \
3299 )",
3300 )
3301 .expect("create table with ai policy");
3302
3303 let contracts = rt.db().collection_contracts();
3304 let contract = contracts
3305 .iter()
3306 .find(|c| c.name == "posts")
3307 .expect("contract persisted");
3308 let policy = contract.ai_policy.as_ref().expect("ai policy persisted");
3309
3310 let embed = policy.embed.as_ref().expect("embed");
3311 assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
3312 assert_eq!(embed.model, "text-embedding-3-small");
3313
3314 let moderate = policy.moderate.as_ref().expect("moderate");
3315 assert!(moderate.sync_gate);
3316 assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
3317 assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
3318
3319 let vision = policy.vision.as_ref().expect("vision");
3320 assert_eq!(vision.image_field, "photo");
3321 assert_eq!(vision.model, "gpt-4o");
3322 }
3323
3324 #[test]
3328 fn create_table_rejects_ai_policy_incapable_provider() {
3329 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3330 let err = rt
3331 .execute_query(
3332 "CREATE TABLE bad (id INT, body TEXT) WITH ( \
3333 EMBED (fields = ('body'), provider = 'anthropic', model = 'claude-3-5-sonnet') \
3334 )",
3335 )
3336 .unwrap_err();
3337 assert!(
3338 err.to_string()
3339 .contains("cannot serve the 'embed' modality"),
3340 "{err}"
3341 );
3342 assert!(
3343 rt.db().store().get_collection("bad").is_none(),
3344 "rejected DDL must not create the collection"
3345 );
3346 }
3347}