1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 let exists = store.get_collection(&query.name).is_some();
38 if exists {
39 if query.if_not_exists {
40 return Ok(RuntimeQueryResult::ok_message(
41 raw_query.to_string(),
42 &format!("table '{}' already exists", query.name),
43 "create",
44 ));
45 }
46 return Err(RedDBError::Query(format!(
47 "table '{}' already exists",
48 query.name
49 )));
50 }
51
52 let contract = collection_contract_from_create_table(query)?;
55 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
56 store
58 .create_collection(&query.name)
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 for subscription in &contract.subscriptions {
61 ensure_event_target_queue(self, &subscription.target_queue)?;
62 }
63 if let Some(default_ttl_ms) = query.default_ttl_ms {
64 self.inner
65 .db
66 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
67 }
68 self.inner
69 .db
70 .save_collection_contract(contract)
71 .map_err(|err| RedDBError::Internal(err.to_string()))?;
72 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
73 store.set_config_tree(
74 &format!("red.collection_tenants.{}", query.name),
75 &crate::serde_json::Value::String(tenant_id),
76 );
77 }
78 self.inner
79 .db
80 .persist_metadata()
81 .map_err(|err| RedDBError::Internal(err.to_string()))?;
82 self.refresh_table_planner_stats(&query.name);
83 self.invalidate_result_cache();
84 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
87 self.schema_vocabulary_apply(
88 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
89 collection: query.name.clone(),
90 columns,
91 type_tags: Vec::new(),
92 description: None,
93 },
94 );
95 if let Some(spec) = &query.partition_by {
102 let kind_str = match spec.kind {
103 crate::storage::query::ast::PartitionKind::Range => "range",
104 crate::storage::query::ast::PartitionKind::List => "list",
105 crate::storage::query::ast::PartitionKind::Hash => "hash",
106 };
107 store.set_config_tree(
108 &format!("partition.{}.by", query.name),
109 &crate::serde_json::Value::String(kind_str.to_string()),
110 );
111 store.set_config_tree(
112 &format!("partition.{}.column", query.name),
113 &crate::serde_json::Value::String(spec.column.clone()),
114 );
115 }
116
117 if let Some(col) = &query.tenant_by {
128 store.set_config_tree(
129 &format!("tenant_tables.{}.column", query.name),
130 &crate::serde_json::Value::String(col.clone()),
131 );
132 self.register_tenant_table(&query.name, col);
133 }
134
135 let ttl_suffix = query
136 .default_ttl_ms
137 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
138 .unwrap_or_default();
139
140 let tenant_suffix = query
141 .tenant_by
142 .as_ref()
143 .map(|col| format!(" (tenant-scoped by {col})"))
144 .unwrap_or_default();
145
146 Ok(RuntimeQueryResult::ok_message(
147 raw_query.to_string(),
148 &format!(
149 "table '{}' created{}{}",
150 query.name, ttl_suffix, tenant_suffix
151 ),
152 "create",
153 ))
154 }
155
156 fn execute_create_keyed_collection(
157 &self,
158 raw_query: &str,
159 query: &CreateTableQuery,
160 ) -> RedDBResult<RuntimeQueryResult> {
161 if query.collection_model == CollectionModel::Document {
162 return Err(RedDBError::Query(
163 "NOT_YET_SUPPORTED: CREATE DOCUMENT is not implemented yet; use an auto-created table by inserting JSON rows into a normal table as a workaround".to_string(),
164 ));
165 }
166 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
167 if is_system_schema_name(&query.name) {
168 return Err(RedDBError::Query("system schema is read-only".to_string()));
169 }
170 let store = self.inner.db.store();
171 let label = polymorphic_resolver::model_name(query.collection_model);
172 if store.get_collection(&query.name).is_some() {
173 if query.if_not_exists {
174 return Ok(RuntimeQueryResult::ok_message(
175 raw_query.to_string(),
176 &format!("{label} '{}' already exists", query.name),
177 "create",
178 ));
179 }
180 return Err(RedDBError::Query(format!(
181 "{label} '{}' already exists",
182 query.name
183 )));
184 }
185
186 store
187 .create_collection(&query.name)
188 .map_err(|err| RedDBError::Internal(err.to_string()))?;
189 if query.collection_model == CollectionModel::Vault {
190 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
191 let key_scope = if query.vault_own_master_key {
192 "own"
193 } else {
194 "cluster"
195 };
196 store.set_config_tree(
197 &format!("red.vault.{}.key_scope", query.name),
198 &crate::serde_json::Value::String(key_scope.to_string()),
199 );
200 store.set_config_tree(
201 &format!("red.vault.{}.status", query.name),
202 &crate::serde_json::Value::String("sealed".to_string()),
203 );
204 }
205 self.inner
206 .db
207 .save_collection_contract(keyed_collection_contract(
208 &query.name,
209 query.collection_model,
210 ))
211 .map_err(|err| RedDBError::Internal(err.to_string()))?;
212 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
213 store.set_config_tree(
214 &format!("red.collection_tenants.{}", query.name),
215 &crate::serde_json::Value::String(tenant_id),
216 );
217 }
218 self.inner
219 .db
220 .persist_metadata()
221 .map_err(|err| RedDBError::Internal(err.to_string()))?;
222 self.invalidate_result_cache();
223
224 Ok(RuntimeQueryResult::ok_message(
225 raw_query.to_string(),
226 &format!("{label} '{}' created", query.name),
227 "create",
228 ))
229 }
230
231 pub fn execute_create_collection(
232 &self,
233 raw_query: &str,
234 query: &CreateCollectionQuery,
235 ) -> RedDBResult<RuntimeQueryResult> {
236 let model = match query.kind.as_str() {
237 "graph" => CollectionModel::Graph,
238 "document" => CollectionModel::Document,
239 other => {
240 return Err(RedDBError::Query(format!(
241 "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
242 )));
243 }
244 };
245 let create = CreateTableQuery {
246 collection_model: model,
247 name: query.name.clone(),
248 columns: Vec::new(),
249 if_not_exists: query.if_not_exists,
250 default_ttl_ms: None,
251 context_index_fields: Vec::new(),
252 context_index_enabled: false,
253 timestamps: false,
254 partition_by: None,
255 tenant_by: None,
256 append_only: false,
257 subscriptions: Vec::new(),
258 vault_own_master_key: false,
259 };
260 self.execute_create_table(raw_query, &create)
261 }
262
263 pub fn execute_create_vector(
264 &self,
265 raw_query: &str,
266 query: &CreateVectorQuery,
267 ) -> RedDBResult<RuntimeQueryResult> {
268 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
269 if is_system_schema_name(&query.name) {
270 return Err(RedDBError::Query("system schema is read-only".to_string()));
271 }
272 let store = self.inner.db.store();
273 if store.get_collection(&query.name).is_some() {
274 if query.if_not_exists {
275 return Ok(RuntimeQueryResult::ok_message(
276 raw_query.to_string(),
277 &format!("vector '{}' already exists", query.name),
278 "create",
279 ));
280 }
281 return Err(RedDBError::Query(format!(
282 "vector '{}' already exists",
283 query.name
284 )));
285 }
286
287 store
288 .create_collection(&query.name)
289 .map_err(|err| RedDBError::Internal(err.to_string()))?;
290 self.inner
291 .db
292 .save_collection_contract(vector_collection_contract(query))
293 .map_err(|err| RedDBError::Internal(err.to_string()))?;
294 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
295 store.set_config_tree(
296 &format!("red.collection_tenants.{}", query.name),
297 &crate::serde_json::Value::String(tenant_id),
298 );
299 }
300 self.inner
301 .db
302 .persist_metadata()
303 .map_err(|err| RedDBError::Internal(err.to_string()))?;
304 self.invalidate_result_cache();
305
306 Ok(RuntimeQueryResult::ok_message(
307 raw_query.to_string(),
308 &format!("vector '{}' created", query.name),
309 "create",
310 ))
311 }
312
313 fn provision_vault_key_material(
314 &self,
315 collection: &str,
316 own_master_key: bool,
317 ) -> RedDBResult<()> {
318 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
319 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
320 })?;
321 if !auth_store.is_vault_backed() {
322 return Err(RedDBError::Query(
323 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
324 ));
325 }
326
327 if auth_store.vault_secret_key().is_none() {
328 let key = crate::auth::store::random_bytes(32);
329 auth_store
330 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
331 .map_err(|err| RedDBError::Query(err.to_string()))?;
332 }
333
334 if own_master_key {
335 let key = crate::auth::store::random_bytes(32);
336 auth_store
337 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
338 .map_err(|err| RedDBError::Query(err.to_string()))?;
339 }
340
341 Ok(())
342 }
343
344 pub fn execute_drop_table(
348 &self,
349 raw_query: &str,
350 query: &DropTableQuery,
351 ) -> RedDBResult<RuntimeQueryResult> {
352 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
353 let store = self.inner.db.store();
354
355 if is_system_schema_name(&query.name) {
356 return Err(RedDBError::Query("system schema is read-only".to_string()));
357 }
358
359 let exists = store.get_collection(&query.name).is_some();
360 if !exists {
361 if query.if_exists {
362 return Ok(RuntimeQueryResult::ok_message(
363 raw_query.to_string(),
364 &format!("table '{}' does not exist", query.name),
365 "drop",
366 ));
367 }
368 return Err(RedDBError::NotFound(format!(
369 "table '{}' not found",
370 query.name
371 )));
372 }
373 let actual =
374 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
375 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
376
377 let final_count = store
380 .get_collection(&query.name)
381 .map(|manager| manager.query_all(|_| true).len() as u64)
382 .unwrap_or(0);
383 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
384 self,
385 &query.name,
386 final_count,
387 )?;
388
389 let orphaned_indices: Vec<String> = self
390 .inner
391 .index_store
392 .list_indices(&query.name)
393 .into_iter()
394 .map(|index| index.name)
395 .collect();
396 for name in &orphaned_indices {
397 self.inner.index_store.drop_index(name, &query.name);
398 }
399
400 store
401 .drop_collection(&query.name)
402 .map_err(|err| RedDBError::Internal(err.to_string()))?;
403 self.inner.db.invalidate_vector_index(&query.name);
404 self.inner.db.clear_collection_default_ttl_ms(&query.name);
405 self.inner
406 .db
407 .remove_collection_contract(&query.name)
408 .map_err(|err| RedDBError::Internal(err.to_string()))?;
409 self.clear_table_planner_stats(&query.name);
410 self.invalidate_result_cache();
411 if let Some(store) = self.inner.auth_store.read().clone() {
415 store.invalidate_visible_collections_cache();
416 }
417 self.inner
418 .db
419 .persist_metadata()
420 .map_err(|err| RedDBError::Internal(err.to_string()))?;
421 self.schema_vocabulary_apply(
427 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
428 collection: query.name.clone(),
429 },
430 );
431
432 Ok(RuntimeQueryResult::ok_message(
433 raw_query.to_string(),
434 &format!("table '{}' dropped", query.name),
435 "drop",
436 ))
437 }
438
439 pub fn execute_drop_graph(
440 &self,
441 raw_query: &str,
442 query: &DropGraphQuery,
443 ) -> RedDBResult<RuntimeQueryResult> {
444 self.execute_drop_typed_collection(
445 raw_query,
446 &query.name,
447 query.if_exists,
448 CollectionModel::Graph,
449 "graph",
450 )
451 }
452
453 pub fn execute_drop_vector(
454 &self,
455 raw_query: &str,
456 query: &DropVectorQuery,
457 ) -> RedDBResult<RuntimeQueryResult> {
458 self.execute_drop_typed_collection(
459 raw_query,
460 &query.name,
461 query.if_exists,
462 CollectionModel::Vector,
463 "vector",
464 )
465 }
466
467 pub fn execute_drop_document(
468 &self,
469 raw_query: &str,
470 query: &DropDocumentQuery,
471 ) -> RedDBResult<RuntimeQueryResult> {
472 self.execute_drop_typed_collection(
473 raw_query,
474 &query.name,
475 query.if_exists,
476 CollectionModel::Document,
477 "document",
478 )
479 }
480
481 pub fn execute_drop_kv(
482 &self,
483 raw_query: &str,
484 query: &DropKvQuery,
485 ) -> RedDBResult<RuntimeQueryResult> {
486 let label = polymorphic_resolver::model_name(query.model);
487 self.execute_drop_typed_collection(
488 raw_query,
489 &query.name,
490 query.if_exists,
491 query.model,
492 label,
493 )
494 }
495
496 pub fn execute_drop_collection(
497 &self,
498 raw_query: &str,
499 query: &DropCollectionQuery,
500 ) -> RedDBResult<RuntimeQueryResult> {
501 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
502 if is_system_schema_name(&query.name) {
503 return Err(RedDBError::Query("system schema is read-only".to_string()));
504 }
505 let store = self.inner.db.store();
506 if store.get_collection(&query.name).is_none() {
507 if query.if_exists {
508 return Ok(RuntimeQueryResult::ok_message(
509 raw_query.to_string(),
510 &format!("collection '{}' does not exist", query.name),
511 "drop",
512 ));
513 }
514 return Err(RedDBError::NotFound(format!(
515 "collection '{}' not found",
516 query.name
517 )));
518 }
519
520 match polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())? {
521 CollectionModel::Table => self.execute_drop_table(
522 raw_query,
523 &DropTableQuery {
524 name: query.name.clone(),
525 if_exists: query.if_exists,
526 },
527 ),
528 CollectionModel::TimeSeries => self.execute_drop_timeseries(
529 raw_query,
530 &DropTimeSeriesQuery {
531 name: query.name.clone(),
532 if_exists: query.if_exists,
533 },
534 ),
535 CollectionModel::Queue => self.execute_drop_queue(
536 raw_query,
537 &DropQueueQuery {
538 name: query.name.clone(),
539 if_exists: query.if_exists,
540 },
541 ),
542 CollectionModel::Graph => self.execute_drop_graph(
543 raw_query,
544 &DropGraphQuery {
545 name: query.name.clone(),
546 if_exists: query.if_exists,
547 },
548 ),
549 CollectionModel::Vector => self.execute_drop_vector(
550 raw_query,
551 &DropVectorQuery {
552 name: query.name.clone(),
553 if_exists: query.if_exists,
554 },
555 ),
556 CollectionModel::Document => self.execute_drop_document(
557 raw_query,
558 &DropDocumentQuery {
559 name: query.name.clone(),
560 if_exists: query.if_exists,
561 },
562 ),
563 CollectionModel::Kv => self.execute_drop_kv(
564 raw_query,
565 &DropKvQuery {
566 name: query.name.clone(),
567 if_exists: query.if_exists,
568 model: CollectionModel::Kv,
569 },
570 ),
571 CollectionModel::Config => self.execute_drop_kv(
572 raw_query,
573 &DropKvQuery {
574 name: query.name.clone(),
575 if_exists: query.if_exists,
576 model: CollectionModel::Config,
577 },
578 ),
579 CollectionModel::Vault => self.execute_drop_kv(
580 raw_query,
581 &DropKvQuery {
582 name: query.name.clone(),
583 if_exists: query.if_exists,
584 model: CollectionModel::Vault,
585 },
586 ),
587 CollectionModel::Hll => self.execute_probabilistic_command(
588 raw_query,
589 &ProbabilisticCommand::DropHll {
590 name: query.name.clone(),
591 if_exists: query.if_exists,
592 },
593 ),
594 CollectionModel::Sketch => self.execute_probabilistic_command(
595 raw_query,
596 &ProbabilisticCommand::DropSketch {
597 name: query.name.clone(),
598 if_exists: query.if_exists,
599 },
600 ),
601 CollectionModel::Filter => self.execute_probabilistic_command(
602 raw_query,
603 &ProbabilisticCommand::DropFilter {
604 name: query.name.clone(),
605 if_exists: query.if_exists,
606 },
607 ),
608 CollectionModel::Mixed => self.execute_drop_typed_collection(
609 raw_query,
610 &query.name,
611 query.if_exists,
612 CollectionModel::Mixed,
613 "collection",
614 ),
615 }
616 }
617
618 pub fn execute_alter_table(
624 &self,
625 raw_query: &str,
626 query: &AlterTableQuery,
627 ) -> RedDBResult<RuntimeQueryResult> {
628 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
629 let store = self.inner.db.store();
630
631 if store.get_collection(&query.name).is_none() {
633 return Err(RedDBError::NotFound(format!(
634 "table '{}' not found",
635 query.name
636 )));
637 }
638
639 let mut messages = Vec::new();
640
641 let fields_added: Vec<String> = query
643 .operations
644 .iter()
645 .filter_map(|op| {
646 if let AlterOperation::AddColumn(col) = op {
647 Some(col.name.clone())
648 } else {
649 None
650 }
651 })
652 .collect();
653 let fields_removed: Vec<String> = query
654 .operations
655 .iter()
656 .filter_map(|op| {
657 if let AlterOperation::DropColumn(name) = op {
658 Some(name.clone())
659 } else {
660 None
661 }
662 })
663 .collect();
664
665 for op in &query.operations {
666 match op {
667 AlterOperation::AddColumn(col) => {
668 messages.push(format!("column '{}' added", col.name));
670 }
671 AlterOperation::DropColumn(name) => {
672 messages.push(format!("column '{}' dropped", name));
673 }
674 AlterOperation::RenameColumn { from, to } => {
675 messages.push(format!("column '{}' renamed to '{}'", from, to));
676 }
677 AlterOperation::AttachPartition { child, bound } => {
678 store.set_config_tree(
682 &format!("partition.{}.children.{}", query.name, child),
683 &crate::serde_json::Value::String(bound.clone()),
684 );
685 messages.push(format!(
686 "partition '{child}' attached to '{}' ({bound})",
687 query.name
688 ));
689 }
690 AlterOperation::DetachPartition { child } => {
691 store.set_config_tree(
692 &format!("partition.{}.children.{}", query.name, child),
693 &crate::serde_json::Value::Null,
694 );
695 messages.push(format!(
696 "partition '{child}' detached from '{}'",
697 query.name
698 ));
699 }
700 AlterOperation::EnableRowLevelSecurity => {
701 self.inner
702 .rls_enabled_tables
703 .write()
704 .insert(query.name.clone());
705 store.set_config_tree(
707 &format!("rls.enabled.{}", query.name),
708 &crate::serde_json::Value::Bool(true),
709 );
710 self.invalidate_plan_cache();
711 messages.push(format!("row level security enabled on '{}'", query.name));
712 }
713 AlterOperation::DisableRowLevelSecurity => {
714 self.inner.rls_enabled_tables.write().remove(&query.name);
715 store.set_config_tree(
716 &format!("rls.enabled.{}", query.name),
717 &crate::serde_json::Value::Null,
718 );
719 self.invalidate_plan_cache();
720 messages.push(format!("row level security disabled on '{}'", query.name));
721 }
722 AlterOperation::EnableTenancy { column } => {
724 store.set_config_tree(
725 &format!("tenant_tables.{}.column", query.name),
726 &crate::serde_json::Value::String(column.clone()),
727 );
728 self.register_tenant_table(&query.name, column);
729 self.invalidate_plan_cache();
730 messages.push(format!(
731 "tenancy enabled on '{}' by column '{column}'",
732 query.name
733 ));
734 }
735 AlterOperation::DisableTenancy => {
736 store.set_config_tree(
737 &format!("tenant_tables.{}.column", query.name),
738 &crate::serde_json::Value::Null,
739 );
740 self.unregister_tenant_table(&query.name);
741 self.invalidate_plan_cache();
742 messages.push(format!("tenancy disabled on '{}'", query.name));
743 }
744 AlterOperation::SetAppendOnly(on) => {
745 messages.push(format!(
750 "append_only {} on '{}'",
751 if *on { "enabled" } else { "disabled" },
752 query.name
753 ));
754 }
755 AlterOperation::SetVersioned(on) => {
756 self.vcs_set_versioned(&query.name, *on)?;
763 messages.push(format!(
764 "versioned {} on '{}'",
765 if *on { "enabled" } else { "disabled" },
766 query.name
767 ));
768 }
769 AlterOperation::EnableEvents(subscription) => {
770 let mut subscription = subscription.clone();
771 subscription.source = query.name.clone();
772 validate_event_subscriptions(
773 self,
774 &query.name,
775 std::slice::from_ref(&subscription),
776 )?;
777 ensure_event_target_queue(self, &subscription.target_queue)?;
778 messages.push(format!(
779 "events enabled on '{}' to '{}'",
780 query.name, subscription.target_queue
781 ));
782 }
783 AlterOperation::DisableEvents => {
784 messages.push(format!("events disabled on '{}'", query.name));
785 }
786 AlterOperation::AddSubscription { name, descriptor } => {
787 let mut sub = descriptor.clone();
788 sub.name = name.clone();
789 sub.source = query.name.clone();
790 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
791 ensure_event_target_queue(self, &sub.target_queue)?;
792 messages.push(format!(
793 "subscription '{}' added on '{}' to '{}'",
794 name, query.name, sub.target_queue
795 ));
796 }
797 AlterOperation::DropSubscription { name } => {
798 messages.push(format!(
799 "subscription '{}' dropped on '{}'",
800 name, query.name
801 ));
802 }
803 }
804 }
805
806 let mut contract = self
807 .inner
808 .db
809 .collection_contract(&query.name)
810 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
811 apply_alter_operations_to_contract(&mut contract, &query.operations);
812 contract.version = contract.version.saturating_add(1);
813 contract.updated_at_unix_ms = current_unix_ms();
814 self.inner
815 .db
816 .save_collection_contract(contract)
817 .map_err(|err| RedDBError::Internal(err.to_string()))?;
818 if !fields_added.is_empty() || !fields_removed.is_empty() {
822 let sub_names: Vec<String> = self
823 .inner
824 .db
825 .collection_contract(&query.name)
826 .map(|c| {
827 c.subscriptions
828 .iter()
829 .filter(|s| s.enabled)
830 .map(|s| s.name.clone())
831 .collect()
832 })
833 .unwrap_or_default();
834 if !sub_names.is_empty() {
835 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
836 collection: query.name.clone(),
837 subscription_names: sub_names.join(", "),
838 fields_added: fields_added.join(", "),
839 fields_removed: fields_removed.join(", "),
840 lsn: self.cdc_current_lsn(),
841 }
842 .emit_global();
843 }
844 }
845
846 self.clear_table_planner_stats(&query.name);
847 self.invalidate_result_cache();
848 let post_alter_columns: Vec<String> = self
853 .inner
854 .db
855 .collection_contract(&query.name)
856 .map(|contract| {
857 contract
858 .declared_columns
859 .iter()
860 .map(|col| col.name.clone())
861 .collect()
862 })
863 .unwrap_or_default();
864 self.schema_vocabulary_apply(
865 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
866 collection: query.name.clone(),
867 columns: post_alter_columns,
868 type_tags: Vec::new(),
869 description: None,
870 },
871 );
872
873 let message = if messages.is_empty() {
874 format!("table '{}' altered (no operations)", query.name)
875 } else {
876 format!("table '{}' altered: {}", query.name, messages.join(", "))
877 };
878
879 Ok(RuntimeQueryResult::ok_message(
880 raw_query.to_string(),
881 &message,
882 "alter",
883 ))
884 }
885
886 pub fn execute_explain_alter(
893 &self,
894 raw_query: &str,
895 query: &ExplainAlterQuery,
896 ) -> RedDBResult<RuntimeQueryResult> {
897 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
901
902 let current_contract = self.inner.db.collection_contract(&query.target.name);
903
904 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
905 .as_ref()
906 .map(|c| c.declared_columns.clone())
907 .unwrap_or_default();
908
909 let diff = super::schema_diff::compute_column_diff(
910 &query.target.name,
911 ¤t_columns,
912 &query.target.columns,
913 );
914
915 let rendered = match query.format {
916 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
917 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
918 };
919
920 let format_label = match query.format {
921 ExplainFormat::Sql => "sql",
922 ExplainFormat::Json => "json",
923 };
924
925 let columns = vec![
926 "table".to_string(),
927 "format".to_string(),
928 "diff".to_string(),
929 ];
930 let row = vec![
931 ("table".to_string(), Value::text(query.target.name.clone())),
932 ("format".to_string(), Value::text(format_label.to_string())),
933 ("diff".to_string(), Value::text(rendered)),
934 ];
935
936 Ok(RuntimeQueryResult::ok_records(
937 raw_query.to_string(),
938 columns,
939 vec![row],
940 "explain",
941 ))
942 }
943
944 pub fn execute_create_index(
949 &self,
950 raw_query: &str,
951 query: &CreateIndexQuery,
952 ) -> RedDBResult<RuntimeQueryResult> {
953 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
954 let store = self.inner.db.store();
955
956 let manager = store
958 .get_collection(&query.table)
959 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
960
961 let method_kind = match query.method {
962 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
963 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
964 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
965 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
966 };
967
968 let entities = manager.query_all(|_| true);
978 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
979 entities
980 .iter()
981 .map(|e| {
982 let fields = match &e.data {
983 crate::storage::EntityData::Row(row) => {
984 if let Some(ref named) = row.named {
985 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
986 } else if let Some(ref schema) = row.schema {
987 schema
991 .iter()
992 .zip(row.columns.iter())
993 .map(|(k, v)| (k.clone(), v.clone()))
994 .collect()
995 } else {
996 Vec::new()
997 }
998 }
999 crate::storage::EntityData::Node(node) => node
1000 .properties
1001 .iter()
1002 .map(|(k, v)| (k.clone(), v.clone()))
1003 .collect(),
1004 _ => Vec::new(),
1005 };
1006 (e.id, fields)
1007 })
1008 .collect();
1009
1010 let indexed_count = self
1012 .inner
1013 .index_store
1014 .create_index(
1015 &query.name,
1016 &query.table,
1017 &query.columns,
1018 method_kind,
1019 query.unique,
1020 &entity_fields,
1021 )
1022 .map_err(RedDBError::Internal)?;
1023
1024 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1025 &query.table,
1026 &entity_fields,
1027 );
1028 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1029 self.invalidate_plan_cache();
1030
1031 self.inner
1033 .index_store
1034 .register(super::index_store::RegisteredIndex {
1035 name: query.name.clone(),
1036 collection: query.table.clone(),
1037 columns: query.columns.clone(),
1038 method: method_kind,
1039 unique: query.unique,
1040 });
1041 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1045 collection: query.table.clone(),
1046 index: query.name.clone(),
1047 columns: query.columns.clone(),
1048 });
1049
1050 let method_str = format!("{}", query.method);
1051 let unique_str = if query.unique { "unique " } else { "" };
1052 let cols = query.columns.join(", ");
1053
1054 Ok(RuntimeQueryResult::ok_message(
1055 raw_query.to_string(),
1056 &format!(
1057 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1058 unique_str, query.name, query.table, cols, method_str, indexed_count
1059 ),
1060 "create",
1061 ))
1062 }
1063
1064 pub fn execute_drop_index(
1068 &self,
1069 raw_query: &str,
1070 query: &DropIndexQuery,
1071 ) -> RedDBResult<RuntimeQueryResult> {
1072 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1073 let store = self.inner.db.store();
1074
1075 if store.get_collection(&query.table).is_none() {
1077 if query.if_exists {
1078 return Ok(RuntimeQueryResult::ok_message(
1079 raw_query.to_string(),
1080 &format!("table '{}' does not exist", query.table),
1081 "drop",
1082 ));
1083 }
1084 return Err(RedDBError::NotFound(format!(
1085 "table '{}' not found",
1086 query.table
1087 )));
1088 }
1089
1090 self.inner.index_store.drop_index(&query.name, &query.table);
1092 self.invalidate_plan_cache();
1093 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1095 collection: query.table.clone(),
1096 index: query.name.clone(),
1097 });
1098
1099 Ok(RuntimeQueryResult::ok_message(
1100 raw_query.to_string(),
1101 &format!("index '{}' dropped from '{}'", query.name, query.table),
1102 "drop",
1103 ))
1104 }
1105
1106 fn execute_drop_typed_collection(
1107 &self,
1108 raw_query: &str,
1109 name: &str,
1110 if_exists: bool,
1111 expected_model: CollectionModel,
1112 label: &str,
1113 ) -> RedDBResult<RuntimeQueryResult> {
1114 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1115 if is_system_schema_name(name) {
1116 return Err(RedDBError::Query("system schema is read-only".to_string()));
1117 }
1118 let store = self.inner.db.store();
1119 if store.get_collection(name).is_none() {
1120 if if_exists {
1121 return Ok(RuntimeQueryResult::ok_message(
1122 raw_query.to_string(),
1123 &format!("{label} '{name}' does not exist"),
1124 "drop",
1125 ));
1126 }
1127 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1128 }
1129
1130 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1131 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1132 self.drop_collection_storage(raw_query, name, label)
1133 }
1134
1135 pub fn execute_truncate(
1136 &self,
1137 raw_query: &str,
1138 query: &TruncateQuery,
1139 ) -> RedDBResult<RuntimeQueryResult> {
1140 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1141 if is_system_schema_name(&query.name) {
1142 return Err(RedDBError::Query("system schema is read-only".to_string()));
1143 }
1144
1145 let label = query
1146 .model
1147 .map(polymorphic_resolver::model_name)
1148 .unwrap_or("collection");
1149 let store = self.inner.db.store();
1150 if store.get_collection(&query.name).is_none() {
1151 if query.if_exists {
1152 return Ok(RuntimeQueryResult::ok_message(
1153 raw_query.to_string(),
1154 &format!("{label} '{}' does not exist", query.name),
1155 "truncate",
1156 ));
1157 }
1158 return Err(RedDBError::NotFound(format!(
1159 "{label} '{}' not found",
1160 query.name
1161 )));
1162 }
1163
1164 let actual =
1165 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1166 if let Some(expected) = query.model {
1167 polymorphic_resolver::ensure_model_match(expected, actual)?;
1168 }
1169
1170 if actual == CollectionModel::Queue {
1171 return self.execute_queue_command(
1172 raw_query,
1173 &QueueCommand::Purge {
1174 queue: query.name.clone(),
1175 },
1176 );
1177 }
1178
1179 let affected = self.truncate_collection_entities(&query.name)?;
1181 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1183 self.inner.db.invalidate_vector_index(&query.name);
1184 self.clear_table_planner_stats(&query.name);
1185 self.invalidate_result_cache();
1186
1187 Ok(RuntimeQueryResult::ok_message(
1188 raw_query.to_string(),
1189 &format!(
1190 "{affected} entities truncated from {label} '{}'",
1191 query.name
1192 ),
1193 "truncate",
1194 ))
1195 }
1196
1197 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1198 let store = self.inner.db.store();
1199 let Some(manager) = store.get_collection(name) else {
1200 return Ok(0);
1201 };
1202 let entities = manager.query_all(|_| true);
1203 if entities.is_empty() {
1204 return Ok(0);
1205 }
1206
1207 for entity in &entities {
1208 let fields = entity_index_fields(&entity.data);
1209 self.inner
1210 .index_store
1211 .index_entity_delete(name, entity.id, &fields)
1212 .map_err(RedDBError::Internal)?;
1213 }
1214
1215 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1216 let deleted_ids = store
1217 .delete_batch(name, &ids)
1218 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1219 for id in &deleted_ids {
1220 store.context_index().remove_entity(*id);
1221 }
1222 Ok(deleted_ids.len() as u64)
1223 }
1224
1225 fn drop_collection_storage(
1226 &self,
1227 raw_query: &str,
1228 name: &str,
1229 label: &str,
1230 ) -> RedDBResult<RuntimeQueryResult> {
1231 let store = self.inner.db.store();
1232
1233 let final_count = store
1236 .get_collection(name)
1237 .map(|manager| manager.query_all(|_| true).len() as u64)
1238 .unwrap_or(0);
1239 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1240 self,
1241 name,
1242 final_count,
1243 )?;
1244
1245 let orphaned_indices: Vec<String> = self
1246 .inner
1247 .index_store
1248 .list_indices(name)
1249 .into_iter()
1250 .map(|index| index.name)
1251 .collect();
1252 for index_name in &orphaned_indices {
1253 self.inner.index_store.drop_index(index_name, name);
1254 }
1255
1256 store
1257 .drop_collection(name)
1258 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1259 self.inner.db.invalidate_vector_index(name);
1260 self.inner.db.clear_collection_default_ttl_ms(name);
1261 self.inner
1262 .db
1263 .remove_collection_contract(name)
1264 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1265 self.clear_table_planner_stats(name);
1266 self.invalidate_result_cache();
1267 if let Some(store) = self.inner.auth_store.read().clone() {
1268 store.invalidate_visible_collections_cache();
1269 }
1270 self.inner
1271 .db
1272 .persist_metadata()
1273 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1274 self.schema_vocabulary_apply(
1275 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1276 collection: name.to_string(),
1277 },
1278 );
1279
1280 Ok(RuntimeQueryResult::ok_message(
1281 raw_query.to_string(),
1282 &format!("{label} '{name}' dropped"),
1283 "drop",
1284 ))
1285 }
1286}
1287
1288pub(crate) fn is_system_schema_name(name: &str) -> bool {
1289 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1290}
1291
1292fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1293 match data {
1294 EntityData::Row(row) => {
1295 if let Some(ref named) = row.named {
1296 named
1297 .iter()
1298 .map(|(key, value)| (key.clone(), value.clone()))
1299 .collect()
1300 } else if let Some(ref schema) = row.schema {
1301 schema
1302 .iter()
1303 .zip(row.columns.iter())
1304 .map(|(key, value)| (key.clone(), value.clone()))
1305 .collect()
1306 } else {
1307 Vec::new()
1308 }
1309 }
1310 EntityData::Node(node) => node
1311 .properties
1312 .iter()
1313 .map(|(key, value)| (key.clone(), value.clone()))
1314 .collect(),
1315 _ => Vec::new(),
1316 }
1317}
1318
1319fn collection_contract_from_create_table(
1320 query: &CreateTableQuery,
1321) -> RedDBResult<crate::physical::CollectionContract> {
1322 let now = current_unix_ms();
1323 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1324 .columns
1325 .iter()
1326 .map(declared_column_contract_from_ddl)
1327 .collect();
1328 if query.timestamps {
1329 declared_columns.push(crate::physical::DeclaredColumnContract {
1333 name: "created_at".to_string(),
1334 data_type: "BIGINT".to_string(),
1335 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1336 not_null: true,
1337 default: None,
1338 compress: None,
1339 unique: false,
1340 primary_key: false,
1341 enum_variants: Vec::new(),
1342 array_element: None,
1343 decimal_precision: None,
1344 });
1345 declared_columns.push(crate::physical::DeclaredColumnContract {
1346 name: "updated_at".to_string(),
1347 data_type: "BIGINT".to_string(),
1348 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1349 not_null: true,
1350 default: None,
1351 compress: None,
1352 unique: false,
1353 primary_key: false,
1354 enum_variants: Vec::new(),
1355 array_element: None,
1356 decimal_precision: None,
1357 });
1358 }
1359 Ok(crate::physical::CollectionContract {
1360 name: query.name.clone(),
1361 declared_model: crate::catalog::CollectionModel::Table,
1362 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1363 origin: crate::physical::ContractOrigin::Explicit,
1364 version: 1,
1365 created_at_unix_ms: now,
1366 updated_at_unix_ms: now,
1367 default_ttl_ms: query.default_ttl_ms,
1368 vector_dimension: None,
1369 vector_metric: None,
1370 context_index_fields: query.context_index_fields.clone(),
1371 declared_columns,
1372 table_def: Some(build_table_def_from_create_table(query)?),
1373 timestamps_enabled: query.timestamps,
1374 context_index_enabled: query.context_index_enabled
1375 || !query.context_index_fields.is_empty(),
1376 append_only: query.append_only,
1377 subscriptions: query.subscriptions.clone(),
1378 })
1379}
1380
1381fn default_collection_contract_for_existing_table(
1382 name: &str,
1383) -> crate::physical::CollectionContract {
1384 let now = current_unix_ms();
1385 crate::physical::CollectionContract {
1386 name: name.to_string(),
1387 declared_model: crate::catalog::CollectionModel::Table,
1388 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1389 origin: crate::physical::ContractOrigin::Explicit,
1390 version: 0,
1391 created_at_unix_ms: now,
1392 updated_at_unix_ms: now,
1393 default_ttl_ms: None,
1394 vector_dimension: None,
1395 vector_metric: None,
1396 context_index_fields: Vec::new(),
1397 declared_columns: Vec::new(),
1398 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1399 timestamps_enabled: false,
1400 context_index_enabled: false,
1401 append_only: false,
1402 subscriptions: Vec::new(),
1403 }
1404}
1405
1406fn keyed_collection_contract(
1407 name: &str,
1408 model: crate::catalog::CollectionModel,
1409) -> crate::physical::CollectionContract {
1410 let now = current_unix_ms();
1411 crate::physical::CollectionContract {
1412 name: name.to_string(),
1413 declared_model: model,
1414 schema_mode: crate::catalog::SchemaMode::Dynamic,
1415 origin: crate::physical::ContractOrigin::Explicit,
1416 version: 1,
1417 created_at_unix_ms: now,
1418 updated_at_unix_ms: now,
1419 default_ttl_ms: None,
1420 vector_dimension: None,
1421 vector_metric: None,
1422 context_index_fields: Vec::new(),
1423 declared_columns: Vec::new(),
1424 table_def: None,
1425 timestamps_enabled: false,
1426 context_index_enabled: false,
1427 append_only: false,
1428 subscriptions: Vec::new(),
1429 }
1430}
1431
1432fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1433 let now = current_unix_ms();
1434 crate::physical::CollectionContract {
1435 name: query.name.clone(),
1436 declared_model: crate::catalog::CollectionModel::Vector,
1437 schema_mode: crate::catalog::SchemaMode::Dynamic,
1438 origin: crate::physical::ContractOrigin::Explicit,
1439 version: 1,
1440 created_at_unix_ms: now,
1441 updated_at_unix_ms: now,
1442 default_ttl_ms: None,
1443 vector_dimension: Some(query.dimension),
1444 vector_metric: Some(query.metric),
1445 context_index_fields: Vec::new(),
1446 declared_columns: Vec::new(),
1447 table_def: None,
1448 timestamps_enabled: false,
1449 context_index_enabled: false,
1450 append_only: false,
1451 subscriptions: Vec::new(),
1452 }
1453}
1454
1455fn declared_column_contract_from_ddl(
1456 column: &CreateColumnDef,
1457) -> crate::physical::DeclaredColumnContract {
1458 crate::physical::DeclaredColumnContract {
1459 name: column.name.clone(),
1460 data_type: column.data_type.clone(),
1461 sql_type: Some(column.sql_type.clone()),
1462 not_null: column.not_null,
1463 default: column.default.clone(),
1464 compress: column.compress,
1465 unique: column.unique,
1466 primary_key: column.primary_key,
1467 enum_variants: column.enum_variants.clone(),
1468 array_element: column.array_element.clone(),
1469 decimal_precision: column.decimal_precision,
1470 }
1471}
1472
1473fn apply_alter_operations_to_contract(
1474 contract: &mut crate::physical::CollectionContract,
1475 operations: &[AlterOperation],
1476) {
1477 if contract.table_def.is_none() {
1478 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1479 }
1480 for operation in operations {
1481 match operation {
1482 AlterOperation::AddColumn(column) => {
1483 if !contract
1484 .declared_columns
1485 .iter()
1486 .any(|existing| existing.name == column.name)
1487 {
1488 contract
1489 .declared_columns
1490 .push(declared_column_contract_from_ddl(column));
1491 }
1492 if let Some(table_def) = contract.table_def.as_mut() {
1493 if table_def.get_column(&column.name).is_none() {
1494 if let Ok(column_def) = column_def_from_ddl(column) {
1495 if column.primary_key {
1496 table_def.primary_key.push(column.name.clone());
1497 table_def.constraints.push(
1498 crate::storage::schema::Constraint::new(
1499 format!("pk_{}", column.name),
1500 crate::storage::schema::ConstraintType::PrimaryKey,
1501 )
1502 .on_columns(vec![column.name.clone()]),
1503 );
1504 }
1505 if column.unique {
1506 table_def.constraints.push(
1507 crate::storage::schema::Constraint::new(
1508 format!("uniq_{}", column.name),
1509 crate::storage::schema::ConstraintType::Unique,
1510 )
1511 .on_columns(vec![column.name.clone()]),
1512 );
1513 }
1514 if column.not_null {
1515 table_def.constraints.push(
1516 crate::storage::schema::Constraint::new(
1517 format!("not_null_{}", column.name),
1518 crate::storage::schema::ConstraintType::NotNull,
1519 )
1520 .on_columns(vec![column.name.clone()]),
1521 );
1522 }
1523 table_def.columns.push(column_def);
1524 }
1525 }
1526 }
1527 }
1528 AlterOperation::DropColumn(name) => {
1529 contract
1530 .declared_columns
1531 .retain(|column| column.name != *name);
1532 if let Some(table_def) = contract.table_def.as_mut() {
1533 if let Some(index) = table_def.column_index(name) {
1534 table_def.columns.remove(index);
1535 }
1536 table_def.primary_key.retain(|column| column != name);
1537 table_def.constraints.retain(|constraint| {
1538 !constraint.columns.iter().any(|column| column == name)
1539 });
1540 table_def
1541 .indexes
1542 .retain(|index| !index.columns.iter().any(|column| column == name));
1543 }
1544 }
1545 AlterOperation::RenameColumn { from, to } => {
1546 if contract
1547 .declared_columns
1548 .iter()
1549 .any(|column| column.name == *to)
1550 {
1551 continue;
1552 }
1553 if let Some(column) = contract
1554 .declared_columns
1555 .iter_mut()
1556 .find(|column| column.name == *from)
1557 {
1558 column.name = to.clone();
1559 }
1560 if let Some(table_def) = contract.table_def.as_mut() {
1561 if let Some(column) = table_def
1562 .columns
1563 .iter_mut()
1564 .find(|column| column.name == *from)
1565 {
1566 column.name = to.clone();
1567 }
1568 for primary_key in &mut table_def.primary_key {
1569 if *primary_key == *from {
1570 *primary_key = to.clone();
1571 }
1572 }
1573 for constraint in &mut table_def.constraints {
1574 for column in &mut constraint.columns {
1575 if *column == *from {
1576 *column = to.clone();
1577 }
1578 }
1579 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1580 for column in ref_columns {
1581 if *column == *from {
1582 *column = to.clone();
1583 }
1584 }
1585 }
1586 }
1587 for index in &mut table_def.indexes {
1588 for column in &mut index.columns {
1589 if *column == *from {
1590 *column = to.clone();
1591 }
1592 }
1593 }
1594 }
1595 }
1596 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1599 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1603 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1606 AlterOperation::SetAppendOnly(on) => {
1607 contract.append_only = *on;
1608 }
1609 AlterOperation::SetVersioned(_) => {}
1612 AlterOperation::EnableEvents(subscription) => {
1613 let mut subscription = subscription.clone();
1614 subscription.source = contract.name.clone();
1615 subscription.enabled = true;
1616 if let Some(existing) = contract
1617 .subscriptions
1618 .iter_mut()
1619 .find(|existing| existing.target_queue == subscription.target_queue)
1620 {
1621 *existing = subscription;
1622 } else {
1623 contract.subscriptions.push(subscription);
1624 }
1625 }
1626 AlterOperation::DisableEvents => {
1627 for subscription in &mut contract.subscriptions {
1628 subscription.enabled = false;
1629 }
1630 }
1631 AlterOperation::AddSubscription { name, descriptor } => {
1632 let mut sub = descriptor.clone();
1633 sub.name = name.clone();
1634 sub.source = contract.name.clone();
1635 sub.enabled = true;
1636 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1637 {
1638 *existing = sub;
1639 } else {
1640 contract.subscriptions.push(sub);
1641 }
1642 }
1643 AlterOperation::DropSubscription { name } => {
1644 contract.subscriptions.retain(|s| s.name != *name);
1645 }
1646 }
1647 }
1648}
1649
1650fn validate_event_subscriptions(
1651 runtime: &RedDBRuntime,
1652 source: &str,
1653 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1654) -> RedDBResult<()> {
1655 for subscription in subscriptions
1656 .iter()
1657 .filter(|subscription| subscription.enabled)
1658 {
1659 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1660 return Err(RedDBError::Query(
1661 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1662 ));
1663 }
1664 validate_subscription_auth(runtime, source, subscription)?;
1665 if subscription.target_queue == source
1666 || subscription_would_create_cycle(
1667 &runtime.inner.db,
1668 source,
1669 &subscription.target_queue,
1670 )
1671 {
1672 return Err(RedDBError::Query(
1673 "subscription would create cycle".to_string(),
1674 ));
1675 }
1676 audit_subscription_redact_gap(runtime, source, subscription);
1677 }
1678 Ok(())
1679}
1680
1681fn validate_subscription_auth(
1682 runtime: &RedDBRuntime,
1683 source: &str,
1684 subscription: &crate::catalog::SubscriptionDescriptor,
1685) -> RedDBResult<()> {
1686 let auth_store = match runtime.inner.auth_store.read().clone() {
1687 Some(store) => store,
1688 None => return Ok(()),
1689 };
1690 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1691 Some(identity) => identity,
1692 None => return Ok(()),
1693 };
1694 let tenant = crate::runtime::impl_core::current_tenant();
1695 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1696
1697 if auth_store.iam_authorization_enabled() {
1698 let ctx = crate::auth::policies::EvalContext {
1699 principal_tenant: tenant.clone(),
1700 current_tenant: tenant.clone(),
1701 peer_ip: None,
1702 mfa_present: false,
1703 now_ms: crate::auth::now_ms(),
1704 principal_is_admin_role: role == crate::auth::Role::Admin,
1705 };
1706 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1707 if let Some(t) = tenant.as_deref() {
1708 source_resource = source_resource.with_tenant(t.to_string());
1709 }
1710 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1711 return Err(RedDBError::Query(format!(
1712 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1713 principal, source_resource.kind, source_resource.name
1714 )));
1715 }
1716
1717 let mut target_resource =
1718 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1719 if let Some(t) = tenant.as_deref() {
1720 target_resource = target_resource.with_tenant(t.to_string());
1721 }
1722 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1723 return Err(RedDBError::Query(format!(
1724 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1725 principal, target_resource.kind, target_resource.name
1726 )));
1727 }
1728 return Ok(());
1729 }
1730
1731 let ctx = crate::auth::privileges::AuthzContext {
1732 principal: &username,
1733 effective_role: role,
1734 tenant: tenant.as_deref(),
1735 };
1736 auth_store
1737 .check_grant(
1738 &ctx,
1739 crate::auth::privileges::Action::Select,
1740 &crate::auth::privileges::Resource::table_from_name(source),
1741 )
1742 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1743 auth_store
1744 .check_grant(
1745 &ctx,
1746 crate::auth::privileges::Action::Insert,
1747 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1748 )
1749 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1750 Ok(())
1751}
1752
1753fn audit_subscription_redact_gap(
1754 runtime: &RedDBRuntime,
1755 source: &str,
1756 subscription: &crate::catalog::SubscriptionDescriptor,
1757) {
1758 let auth_store = match runtime.inner.auth_store.read().clone() {
1759 Some(store) if store.iam_authorization_enabled() => store,
1760 _ => return,
1761 };
1762 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1763 Some(identity) => identity,
1764 None => return,
1765 };
1766 let tenant = crate::runtime::impl_core::current_tenant();
1767 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1768 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1769 if missing.is_empty() {
1770 return;
1771 }
1772
1773 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1774 tracing::warn!(
1775 target: "reddb::operator",
1776 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1777 source,
1778 subscription.target_queue,
1779 columns
1780 );
1781 let mut event = AuditEvent::builder("subscription_redact_gap")
1782 .principal(username)
1783 .source(AuditAuthSource::System)
1784 .resource(format!(
1785 "subscription:{}->{}",
1786 source, subscription.target_queue
1787 ))
1788 .outcome(Outcome::Success)
1789 .field(AuditFieldEscaper::field("source", source))
1790 .field(AuditFieldEscaper::field(
1791 "target_queue",
1792 subscription.target_queue.clone(),
1793 ))
1794 .field(AuditFieldEscaper::field(
1795 "subscription",
1796 subscription.name.clone(),
1797 ))
1798 .field(AuditFieldEscaper::field("columns", columns))
1799 .field(AuditFieldEscaper::field("role", role.as_str()));
1800 if let Some(t) = tenant {
1801 event = event.tenant(t);
1802 }
1803 runtime.inner.audit_log.record_event(event.build());
1804}
1805
1806fn subscription_redact_gap_columns(
1807 auth_store: &crate::auth::store::AuthStore,
1808 principal: &crate::auth::UserId,
1809 source: &str,
1810 subscription: &crate::catalog::SubscriptionDescriptor,
1811) -> BTreeSet<String> {
1812 let redacted: HashSet<String> = subscription
1813 .redact_fields
1814 .iter()
1815 .map(|field| field.to_ascii_lowercase())
1816 .collect();
1817 auth_store
1818 .effective_policies(principal)
1819 .iter()
1820 .flat_map(|policy| policy.statements.iter())
1821 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1822 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1823 .flat_map(|statement| statement.resources.iter())
1824 .filter_map(|resource| denied_column_for_source(resource, source))
1825 .filter(|column| !redact_covers_column(&redacted, source, column))
1826 .collect()
1827}
1828
1829fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1830 match pattern {
1831 crate::auth::policies::ActionPattern::Wildcard => true,
1832 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1833 crate::auth::policies::ActionPattern::Prefix(prefix) => {
1834 "select".len() > prefix.len() + 1
1835 && "select".starts_with(prefix)
1836 && "select".as_bytes()[prefix.len()] == b':'
1837 }
1838 }
1839}
1840
1841fn denied_column_for_source(
1842 resource: &crate::auth::policies::ResourcePattern,
1843 source: &str,
1844) -> Option<String> {
1845 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1846 return None;
1847 };
1848 if kind != "column" {
1849 return None;
1850 }
1851 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1852 (column.table_resource_name() == source).then_some(column.column)
1853}
1854
1855fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1856 let column = column.to_ascii_lowercase();
1857 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1858 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1859}
1860
1861fn subscription_would_create_cycle(
1862 db: &crate::storage::unified::devx::RedDB,
1863 source: &str,
1864 target: &str,
1865) -> bool {
1866 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1867 for contract in db.collection_contracts() {
1868 for subscription in contract
1869 .subscriptions
1870 .into_iter()
1871 .filter(|subscription| subscription.enabled)
1872 {
1873 graph
1874 .entry(subscription.source)
1875 .or_default()
1876 .push(subscription.target_queue);
1877 }
1878 }
1879 graph
1880 .entry(source.to_string())
1881 .or_default()
1882 .push(target.to_string());
1883
1884 let mut stack = vec![target.to_string()];
1885 let mut seen = HashSet::new();
1886 while let Some(node) = stack.pop() {
1887 if node == source {
1888 return true;
1889 }
1890 if !seen.insert(node.clone()) {
1891 continue;
1892 }
1893 if let Some(next) = graph.get(&node) {
1894 stack.extend(next.iter().cloned());
1895 }
1896 }
1897 false
1898}
1899
1900pub(crate) fn ensure_event_target_queue_pub(
1901 runtime: &RedDBRuntime,
1902 queue: &str,
1903) -> RedDBResult<()> {
1904 ensure_event_target_queue(runtime, queue)
1905}
1906
1907fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
1908 let store = runtime.inner.db.store();
1909 if store.get_collection(queue).is_some() {
1910 return Ok(());
1911 }
1912 store
1913 .create_collection(queue)
1914 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1915 runtime
1916 .inner
1917 .db
1918 .save_collection_contract(event_queue_collection_contract(queue))
1919 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1920 store.set_config_tree(
1921 &format!("queue.{queue}.mode"),
1922 &crate::serde_json::Value::String("fanout".to_string()),
1923 );
1924 Ok(())
1925}
1926
1927fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
1928 let now = current_unix_ms();
1929 crate::physical::CollectionContract {
1930 name: queue.to_string(),
1931 declared_model: crate::catalog::CollectionModel::Queue,
1932 schema_mode: crate::catalog::SchemaMode::Dynamic,
1933 origin: crate::physical::ContractOrigin::Implicit,
1934 version: 1,
1935 created_at_unix_ms: now,
1936 updated_at_unix_ms: now,
1937 default_ttl_ms: None,
1938 vector_dimension: None,
1939 vector_metric: None,
1940 context_index_fields: Vec::new(),
1941 declared_columns: Vec::new(),
1942 table_def: None,
1943 timestamps_enabled: false,
1944 context_index_enabled: false,
1945 append_only: true,
1946 subscriptions: Vec::new(),
1947 }
1948}
1949
1950fn build_table_def_from_create_table(
1951 query: &CreateTableQuery,
1952) -> RedDBResult<crate::storage::schema::TableDef> {
1953 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
1954 for column in &query.columns {
1955 if column.primary_key {
1956 table.primary_key.push(column.name.clone());
1957 table.constraints.push(
1958 crate::storage::schema::Constraint::new(
1959 format!("pk_{}", column.name),
1960 crate::storage::schema::ConstraintType::PrimaryKey,
1961 )
1962 .on_columns(vec![column.name.clone()]),
1963 );
1964 }
1965 if column.unique {
1966 table.constraints.push(
1967 crate::storage::schema::Constraint::new(
1968 format!("uniq_{}", column.name),
1969 crate::storage::schema::ConstraintType::Unique,
1970 )
1971 .on_columns(vec![column.name.clone()]),
1972 );
1973 }
1974 if column.not_null {
1975 table.constraints.push(
1976 crate::storage::schema::Constraint::new(
1977 format!("not_null_{}", column.name),
1978 crate::storage::schema::ConstraintType::NotNull,
1979 )
1980 .on_columns(vec![column.name.clone()]),
1981 );
1982 }
1983 table.columns.push(column_def_from_ddl(column)?);
1984 }
1985 if query.timestamps {
1990 table.columns.push(
1991 crate::storage::schema::ColumnDef::new(
1992 "created_at".to_string(),
1993 crate::storage::schema::DataType::UnsignedInteger,
1994 )
1995 .not_null(),
1996 );
1997 table.columns.push(
1998 crate::storage::schema::ColumnDef::new(
1999 "updated_at".to_string(),
2000 crate::storage::schema::DataType::UnsignedInteger,
2001 )
2002 .not_null(),
2003 );
2004 table.constraints.push(
2005 crate::storage::schema::Constraint::new(
2006 "not_null_created_at".to_string(),
2007 crate::storage::schema::ConstraintType::NotNull,
2008 )
2009 .on_columns(vec!["created_at".to_string()]),
2010 );
2011 table.constraints.push(
2012 crate::storage::schema::Constraint::new(
2013 "not_null_updated_at".to_string(),
2014 crate::storage::schema::ConstraintType::NotNull,
2015 )
2016 .on_columns(vec!["updated_at".to_string()]),
2017 );
2018 }
2019 table
2020 .validate()
2021 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2022 Ok(table)
2023}
2024
2025fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2026 let data_type = resolve_declared_data_type(&column.data_type)
2027 .map_err(|err| RedDBError::Query(err.to_string()))?;
2028 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2029 if column.not_null {
2030 column_def = column_def.not_null();
2031 }
2032 if let Some(default) = &column.default {
2033 column_def = column_def.with_default(default.as_bytes().to_vec());
2034 }
2035 if column.compress.unwrap_or(0) > 0 {
2036 column_def = column_def.compressed();
2037 }
2038 if !column.enum_variants.is_empty() {
2039 column_def = column_def.with_variants(column.enum_variants.clone());
2040 }
2041 if let Some(precision) = column.decimal_precision {
2042 column_def = column_def.with_precision(precision);
2043 }
2044 if let Some(element_type) = &column.array_element {
2045 column_def = column_def.with_element_type(
2046 resolve_declared_data_type(element_type)
2047 .map_err(|err| RedDBError::Query(err.to_string()))?,
2048 );
2049 }
2050 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2051 if column.unique {
2052 column_def = column_def.with_metadata("unique", "true");
2053 }
2054 if column.primary_key {
2055 column_def = column_def.with_metadata("primary_key", "true");
2056 }
2057 Ok(column_def)
2058}
2059
2060fn current_unix_ms() -> u128 {
2061 std::time::SystemTime::now()
2062 .duration_since(std::time::UNIX_EPOCH)
2063 .unwrap_or_default()
2064 .as_millis()
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2070 use crate::auth::store::{AuthStore, PrincipalRef};
2071 use crate::auth::UserId;
2072 use crate::auth::{AuthConfig, Role};
2073 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2074 use crate::storage::schema::Value;
2075 use crate::{RedDBOptions, RedDBRuntime};
2076 use std::sync::Arc;
2077
2078 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2079 Policy {
2080 id: id.to_string(),
2081 version: 1,
2082 tenant: None,
2083 created_at: 0,
2084 updated_at: 0,
2085 statements: vec![Statement {
2086 sid: None,
2087 effect: Effect::Allow,
2088 actions: vec![ActionPattern::Exact(action.to_string())],
2089 resources: vec![ResourcePattern::Exact {
2090 kind: "collection".to_string(),
2091 name: collection.to_string(),
2092 }],
2093 condition: None,
2094 }],
2095 }
2096 }
2097
2098 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2099 let store = Arc::new(AuthStore::new(AuthConfig::default()));
2100 *rt.inner.auth_store.write() = Some(store.clone());
2101 store
2102 }
2103
2104 #[test]
2105 fn drop_denied_without_iam_policy() {
2106 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2107 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2108 let store = wire_auth_store(&rt);
2109 let select_only = Policy {
2111 id: "select-only".to_string(),
2112 version: 1,
2113 tenant: None,
2114 created_at: 0,
2115 updated_at: 0,
2116 statements: vec![Statement {
2117 sid: None,
2118 effect: Effect::Allow,
2119 actions: vec![ActionPattern::Exact("select".to_string())],
2120 resources: vec![ResourcePattern::Wildcard],
2121 condition: None,
2122 }],
2123 };
2124 store.put_policy_internal(select_only).unwrap();
2125 let alice = UserId::from_parts(None, "alice");
2126 store
2127 .attach_policy(PrincipalRef::User(alice), "select-only")
2128 .unwrap();
2129 set_current_auth_identity("alice".to_string(), Role::Write);
2130 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2131 clear_current_auth_identity();
2132 assert!(
2133 format!("{err}").contains("denied by IAM policy"),
2134 "got: {err}"
2135 );
2136 }
2137
2138 #[test]
2139 fn drop_allowed_with_explicit_iam_policy() {
2140 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2141 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2142 let store = wire_auth_store(&rt);
2143 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2144 store.put_policy_internal(policy).unwrap();
2145 let bob = UserId::from_parts(None, "bob");
2146 store
2147 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2148 .unwrap();
2149 set_current_auth_identity("bob".to_string(), Role::Write);
2150 rt.execute_query("DROP TABLE bar").unwrap();
2151 clear_current_auth_identity();
2152 }
2153
2154 #[test]
2155 fn drop_allowed_with_wildcard_iam_policy() {
2156 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2157 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2158 let store = wire_auth_store(&rt);
2159 let policy = Policy {
2160 id: "allow-drop-all".to_string(),
2161 version: 1,
2162 tenant: None,
2163 created_at: 0,
2164 updated_at: 0,
2165 statements: vec![Statement {
2166 sid: None,
2167 effect: Effect::Allow,
2168 actions: vec![ActionPattern::Exact("drop".to_string())],
2169 resources: vec![ResourcePattern::Wildcard],
2170 condition: None,
2171 }],
2172 };
2173 store.put_policy_internal(policy).unwrap();
2174 let carl = UserId::from_parts(None, "carl");
2175 store
2176 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2177 .unwrap();
2178 set_current_auth_identity("carl".to_string(), Role::Write);
2179 rt.execute_query("DROP TABLE baz").unwrap();
2180 clear_current_auth_identity();
2181 }
2182
2183 #[test]
2184 fn truncate_denied_without_iam_policy() {
2185 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2186 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2187 let store = wire_auth_store(&rt);
2188 let select_only = Policy {
2190 id: "select-only-2".to_string(),
2191 version: 1,
2192 tenant: None,
2193 created_at: 0,
2194 updated_at: 0,
2195 statements: vec![Statement {
2196 sid: None,
2197 effect: Effect::Allow,
2198 actions: vec![ActionPattern::Exact("select".to_string())],
2199 resources: vec![ResourcePattern::Wildcard],
2200 condition: None,
2201 }],
2202 };
2203 store.put_policy_internal(select_only).unwrap();
2204 let dana = UserId::from_parts(None, "dana");
2205 store
2206 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2207 .unwrap();
2208 set_current_auth_identity("dana".to_string(), Role::Write);
2209 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2210 clear_current_auth_identity();
2211 assert!(
2212 format!("{err}").contains("denied by IAM policy"),
2213 "got: {err}"
2214 );
2215 }
2216
2217 #[test]
2218 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2219 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2220 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2221 .unwrap();
2222 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2223 .unwrap();
2224 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2225 .unwrap();
2226
2227 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2228 assert_eq!(truncated.statement_type, "truncate");
2229 assert_eq!(truncated.affected_rows, 0);
2230
2231 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2232 assert!(empty.result.records.is_empty());
2233
2234 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2235 .unwrap();
2236 let selected = rt
2237 .execute_query("SELECT name FROM users WHERE id = 3")
2238 .unwrap();
2239 let name = selected.result.records[0].get("name").unwrap();
2240 assert_eq!(name, &Value::text("cy"));
2241 assert!(rt.db().collection_contract("users").is_some());
2242 assert!(rt
2243 .inner
2244 .index_store
2245 .list_indices("users")
2246 .iter()
2247 .any(|index| index.name == "idx_users_id"));
2248 }
2249
2250 #[test]
2251 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2252 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2253 rt.execute_query("CREATE QUEUE tasks").unwrap();
2254 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2255
2256 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2257 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2258
2259 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2260 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2261 assert_eq!(
2262 len.result.records[0].get("len"),
2263 Some(&Value::UnsignedInteger(0))
2264 );
2265 }
2266
2267 #[test]
2268 fn truncate_system_schema_is_read_only() {
2269 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2270 let err = rt
2271 .execute_query("TRUNCATE COLLECTION red.collections")
2272 .unwrap_err();
2273 assert!(format!("{err}").contains("system schema is read-only"));
2274 }
2275
2276 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2279 let result = rt
2280 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2281 .expect("peek queue");
2282 result
2283 .result
2284 .records
2285 .iter()
2286 .map(
2287 |record| match record.get("payload").expect("payload column") {
2288 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2289 other => panic!("expected JSON queue payload, got {other:?}"),
2290 },
2291 )
2292 .collect()
2293 }
2294
2295 #[test]
2298 fn truncate_event_enabled_table_emits_single_truncate_event() {
2299 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2300 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2301 .unwrap();
2302 rt.execute_query(
2303 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2304 )
2305 .unwrap();
2306
2307 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2309
2310 rt.execute_query("TRUNCATE TABLE users").unwrap();
2311
2312 let events = queue_payloads(&rt, "users_events");
2313 assert_eq!(
2315 events.len(),
2316 1,
2317 "expected 1 truncate event, got {}",
2318 events.len()
2319 );
2320 let ev = events[0].as_object().expect("event is object");
2321 assert_eq!(
2322 ev.get("op").and_then(crate::json::Value::as_str),
2323 Some("truncate")
2324 );
2325 assert_eq!(
2326 ev.get("collection").and_then(crate::json::Value::as_str),
2327 Some("users")
2328 );
2329 assert_eq!(
2330 ev.get("entities_count")
2331 .and_then(crate::json::Value::as_u64),
2332 Some(3)
2333 );
2334 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2335 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2336 assert!(ev
2337 .get("event_id")
2338 .and_then(crate::json::Value::as_str)
2339 .is_some_and(|s| !s.is_empty()));
2340 }
2341
2342 #[test]
2344 fn truncate_no_events_collection_emits_nothing() {
2345 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2346 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2347 .unwrap();
2348 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2349 .unwrap();
2350 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2352 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2354 assert!(rows.result.records.is_empty());
2355 }
2356
2357 #[test]
2361 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2362 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2363 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2364 .unwrap();
2365 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2366 .unwrap();
2367
2368 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2370
2371 rt.execute_query("DROP TABLE users").unwrap();
2372
2373 let events = queue_payloads(&rt, "users_events");
2375 assert_eq!(
2376 events.len(),
2377 1,
2378 "expected 1 collection_dropped event, got {}",
2379 events.len()
2380 );
2381 let ev = events[0].as_object().expect("event is object");
2382 assert_eq!(
2383 ev.get("op").and_then(crate::json::Value::as_str),
2384 Some("collection_dropped")
2385 );
2386 assert_eq!(
2387 ev.get("collection").and_then(crate::json::Value::as_str),
2388 Some("users")
2389 );
2390 assert_eq!(
2391 ev.get("final_entities_count")
2392 .and_then(crate::json::Value::as_u64),
2393 Some(2)
2394 );
2395 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2396 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2397 assert!(ev
2398 .get("event_id")
2399 .and_then(crate::json::Value::as_str)
2400 .is_some_and(|s| !s.is_empty()));
2401
2402 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2404 assert!(
2405 format!("{err}").contains("users"),
2406 "expected not-found error"
2407 );
2408 }
2409
2410 #[test]
2413 fn drop_no_events_collection_emits_nothing() {
2414 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2415 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2416 .unwrap();
2417 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2418 .unwrap();
2419 rt.execute_query("DROP TABLE plain").unwrap();
2420 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2422 assert!(format!("{err}").contains("plain"));
2423 }
2424
2425 #[test]
2429 fn ops_filter_insert_only_ignores_update_and_delete() {
2430 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2431 rt.execute_query(
2432 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2433 )
2434 .unwrap();
2435 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2436 .unwrap();
2437 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2438 .unwrap();
2439 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2440
2441 let events = queue_payloads(&rt, "items_events");
2442 assert_eq!(
2444 events.len(),
2445 1,
2446 "expected 1 insert event, got {}",
2447 events.len()
2448 );
2449 assert_eq!(
2450 events[0]
2451 .as_object()
2452 .unwrap()
2453 .get("op")
2454 .and_then(crate::json::Value::as_str),
2455 Some("insert")
2456 );
2457 }
2458
2459 #[test]
2461 fn where_filter_skips_rows_that_do_not_match() {
2462 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2463 rt.execute_query(
2464 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2465 )
2466 .unwrap();
2467
2468 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2470 .unwrap();
2471 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2473 .unwrap();
2474
2475 let events = queue_payloads(&rt, "users_events");
2476 assert_eq!(
2477 events.len(),
2478 1,
2479 "expected 1 event (only active), got {}",
2480 events.len()
2481 );
2482 let ev = events[0].as_object().unwrap();
2483 assert_eq!(
2484 ev.get("op").and_then(crate::json::Value::as_str),
2485 Some("insert")
2486 );
2487 let after = ev.get("after").unwrap().as_object().unwrap();
2488 assert_eq!(
2489 after.get("status").and_then(crate::json::Value::as_str),
2490 Some("active")
2491 );
2492 }
2493
2494 #[test]
2496 fn ops_filter_and_where_filter_combined() {
2497 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2498 rt.execute_query(
2499 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2500 )
2501 .unwrap();
2502
2503 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2505 .unwrap();
2506 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2508 .unwrap();
2509 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2511 .unwrap();
2512 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2514
2515 let events = queue_payloads(&rt, "items_events");
2516 assert_eq!(
2518 events.len(),
2519 1,
2520 "expected 1 event, got {}: {events:?}",
2521 events.len()
2522 );
2523 assert_eq!(
2524 events[0]
2525 .as_object()
2526 .unwrap()
2527 .get("op")
2528 .and_then(crate::json::Value::as_str),
2529 Some("insert")
2530 );
2531 }
2532
2533 #[test]
2535 fn where_filter_on_delete_checks_before_state() {
2536 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2537 rt.execute_query(
2538 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2539 )
2540 .unwrap();
2541
2542 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2543 .unwrap();
2544
2545 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2547 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2549
2550 let events = queue_payloads(&rt, "users_events");
2551 assert_eq!(
2552 events.len(),
2553 1,
2554 "expected 1 delete event, got {}",
2555 events.len()
2556 );
2557 let ev = events[0].as_object().unwrap();
2558 assert_eq!(
2559 ev.get("op").and_then(crate::json::Value::as_str),
2560 Some("delete")
2561 );
2562 }
2563
2564 #[test]
2568 fn alter_add_column_on_event_enabled_table_succeeds() {
2569 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2570 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2571 .unwrap();
2572 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2574 .unwrap();
2575 let contract = rt.db().collection_contract("users").unwrap();
2577 assert!(
2578 contract.declared_columns.iter().any(|c| c.name == "phone"),
2579 "phone column should be in contract"
2580 );
2581 assert!(
2583 contract.subscriptions.iter().any(|s| s.enabled),
2584 "subscription should remain enabled"
2585 );
2586 }
2587
2588 #[test]
2591 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2592 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2593 rt.execute_query(
2594 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2595 )
2596 .unwrap();
2597 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2599 .unwrap();
2600 let contract = rt.db().collection_contract("items").unwrap();
2601 assert!(
2602 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2603 "secret column should be removed"
2604 );
2605 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2607 .unwrap();
2608 assert!(
2610 contract.subscriptions.iter().any(|s| s.enabled),
2611 "subscription should remain enabled"
2612 );
2613 }
2614}