1use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16 format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20 pub fn execute_create_table(
26 &self,
27 raw_query: &str,
28 query: &CreateTableQuery,
29 ) -> RedDBResult<RuntimeQueryResult> {
30 if query.collection_model != CollectionModel::Table {
31 return self.execute_create_keyed_collection(raw_query, query);
32 }
33 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34 let store = self.inner.db.store();
35 analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36 let exists = store.get_collection(&query.name).is_some();
38 if exists {
39 if query.if_not_exists {
40 return Ok(RuntimeQueryResult::ok_message(
41 raw_query.to_string(),
42 &format!("table '{}' already exists", query.name),
43 "create",
44 ));
45 }
46 return Err(RedDBError::Query(format!(
47 "table '{}' already exists",
48 query.name
49 )));
50 }
51
52 let contract = collection_contract_from_create_table(query)?;
55 validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
56 store
58 .create_collection(&query.name)
59 .map_err(|err| RedDBError::Internal(err.to_string()))?;
60 for subscription in &contract.subscriptions {
61 ensure_event_target_queue(self, &subscription.target_queue)?;
62 }
63 if let Some(default_ttl_ms) = query.default_ttl_ms {
64 self.inner
65 .db
66 .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
67 }
68 self.inner
69 .db
70 .save_collection_contract(contract)
71 .map_err(|err| RedDBError::Internal(err.to_string()))?;
72 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
73 store.set_config_tree(
74 &format!("red.collection_tenants.{}", query.name),
75 &crate::serde_json::Value::String(tenant_id),
76 );
77 }
78 self.inner
79 .db
80 .persist_metadata()
81 .map_err(|err| RedDBError::Internal(err.to_string()))?;
82 self.refresh_table_planner_stats(&query.name);
83 self.invalidate_result_cache();
84 let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
87 self.schema_vocabulary_apply(
88 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
89 collection: query.name.clone(),
90 columns,
91 type_tags: Vec::new(),
92 description: None,
93 },
94 );
95 if let Some(spec) = &query.partition_by {
102 let kind_str = match spec.kind {
103 crate::storage::query::ast::PartitionKind::Range => "range",
104 crate::storage::query::ast::PartitionKind::List => "list",
105 crate::storage::query::ast::PartitionKind::Hash => "hash",
106 };
107 store.set_config_tree(
108 &format!("partition.{}.by", query.name),
109 &crate::serde_json::Value::String(kind_str.to_string()),
110 );
111 store.set_config_tree(
112 &format!("partition.{}.column", query.name),
113 &crate::serde_json::Value::String(spec.column.clone()),
114 );
115 }
116
117 if let Some(col) = &query.tenant_by {
128 store.set_config_tree(
129 &format!("tenant_tables.{}.column", query.name),
130 &crate::serde_json::Value::String(col.clone()),
131 );
132 self.register_tenant_table(&query.name, col);
133 }
134
135 let ttl_suffix = query
136 .default_ttl_ms
137 .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
138 .unwrap_or_default();
139
140 let tenant_suffix = query
141 .tenant_by
142 .as_ref()
143 .map(|col| format!(" (tenant-scoped by {col})"))
144 .unwrap_or_default();
145
146 Ok(RuntimeQueryResult::ok_message(
147 raw_query.to_string(),
148 &format!(
149 "table '{}' created{}{}",
150 query.name, ttl_suffix, tenant_suffix
151 ),
152 "create",
153 ))
154 }
155
156 fn execute_create_keyed_collection(
157 &self,
158 raw_query: &str,
159 query: &CreateTableQuery,
160 ) -> RedDBResult<RuntimeQueryResult> {
161 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
162 if is_system_schema_name(&query.name) {
163 return Err(RedDBError::Query("system schema is read-only".to_string()));
164 }
165 let store = self.inner.db.store();
166 let label = polymorphic_resolver::model_name(query.collection_model);
167 if store.get_collection(&query.name).is_some() {
168 if query.if_not_exists {
169 return Ok(RuntimeQueryResult::ok_message(
170 raw_query.to_string(),
171 &format!("{label} '{}' already exists", query.name),
172 "create",
173 ));
174 }
175 return Err(RedDBError::Query(format!(
176 "{label} '{}' already exists",
177 query.name
178 )));
179 }
180
181 store
182 .create_collection(&query.name)
183 .map_err(|err| RedDBError::Internal(err.to_string()))?;
184 if query.collection_model == CollectionModel::Vault {
185 self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
186 let key_scope = if query.vault_own_master_key {
187 "own"
188 } else {
189 "cluster"
190 };
191 store.set_config_tree(
192 &format!("red.vault.{}.key_scope", query.name),
193 &crate::serde_json::Value::String(key_scope.to_string()),
194 );
195 store.set_config_tree(
196 &format!("red.vault.{}.status", query.name),
197 &crate::serde_json::Value::String("sealed".to_string()),
198 );
199 }
200 self.inner
201 .db
202 .save_collection_contract(keyed_collection_contract(
203 &query.name,
204 query.collection_model,
205 ))
206 .map_err(|err| RedDBError::Internal(err.to_string()))?;
207 if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
208 store.set_config_tree(
209 &format!("red.collection_tenants.{}", query.name),
210 &crate::serde_json::Value::String(tenant_id),
211 );
212 }
213 self.inner
214 .db
215 .persist_metadata()
216 .map_err(|err| RedDBError::Internal(err.to_string()))?;
217 self.invalidate_result_cache();
218
219 Ok(RuntimeQueryResult::ok_message(
220 raw_query.to_string(),
221 &format!("{label} '{}' created", query.name),
222 "create",
223 ))
224 }
225
226 fn provision_vault_key_material(
227 &self,
228 collection: &str,
229 own_master_key: bool,
230 ) -> RedDBResult<()> {
231 let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
232 RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
233 })?;
234 if !auth_store.is_vault_backed() {
235 return Err(RedDBError::Query(
236 "CREATE VAULT requires an enabled, unsealed vault".to_string(),
237 ));
238 }
239
240 if auth_store.vault_secret_key().is_none() {
241 let key = crate::auth::store::random_bytes(32);
242 auth_store
243 .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
244 .map_err(|err| RedDBError::Query(err.to_string()))?;
245 }
246
247 if own_master_key {
248 let key = crate::auth::store::random_bytes(32);
249 auth_store
250 .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
251 .map_err(|err| RedDBError::Query(err.to_string()))?;
252 }
253
254 Ok(())
255 }
256
257 pub fn execute_drop_table(
261 &self,
262 raw_query: &str,
263 query: &DropTableQuery,
264 ) -> RedDBResult<RuntimeQueryResult> {
265 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
266 let store = self.inner.db.store();
267
268 if is_system_schema_name(&query.name) {
269 return Err(RedDBError::Query("system schema is read-only".to_string()));
270 }
271
272 let exists = store.get_collection(&query.name).is_some();
273 if !exists {
274 if query.if_exists {
275 return Ok(RuntimeQueryResult::ok_message(
276 raw_query.to_string(),
277 &format!("table '{}' does not exist", query.name),
278 "drop",
279 ));
280 }
281 return Err(RedDBError::NotFound(format!(
282 "table '{}' not found",
283 query.name
284 )));
285 }
286 let actual =
287 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
288 polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
289
290 let final_count = store
293 .get_collection(&query.name)
294 .map(|manager| manager.query_all(|_| true).len() as u64)
295 .unwrap_or(0);
296 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
297 self,
298 &query.name,
299 final_count,
300 )?;
301
302 let orphaned_indices: Vec<String> = self
303 .inner
304 .index_store
305 .list_indices(&query.name)
306 .into_iter()
307 .map(|index| index.name)
308 .collect();
309 for name in &orphaned_indices {
310 self.inner.index_store.drop_index(name, &query.name);
311 }
312
313 store
314 .drop_collection(&query.name)
315 .map_err(|err| RedDBError::Internal(err.to_string()))?;
316 self.inner.db.invalidate_vector_index(&query.name);
317 self.inner.db.clear_collection_default_ttl_ms(&query.name);
318 self.inner
319 .db
320 .remove_collection_contract(&query.name)
321 .map_err(|err| RedDBError::Internal(err.to_string()))?;
322 self.clear_table_planner_stats(&query.name);
323 self.invalidate_result_cache();
324 if let Some(store) = self.inner.auth_store.read().clone() {
328 store.invalidate_visible_collections_cache();
329 }
330 self.inner
331 .db
332 .persist_metadata()
333 .map_err(|err| RedDBError::Internal(err.to_string()))?;
334 self.schema_vocabulary_apply(
340 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
341 collection: query.name.clone(),
342 },
343 );
344
345 Ok(RuntimeQueryResult::ok_message(
346 raw_query.to_string(),
347 &format!("table '{}' dropped", query.name),
348 "drop",
349 ))
350 }
351
352 pub fn execute_drop_graph(
353 &self,
354 raw_query: &str,
355 query: &DropGraphQuery,
356 ) -> RedDBResult<RuntimeQueryResult> {
357 self.execute_drop_typed_collection(
358 raw_query,
359 &query.name,
360 query.if_exists,
361 CollectionModel::Graph,
362 "graph",
363 )
364 }
365
366 pub fn execute_drop_vector(
367 &self,
368 raw_query: &str,
369 query: &DropVectorQuery,
370 ) -> RedDBResult<RuntimeQueryResult> {
371 self.execute_drop_typed_collection(
372 raw_query,
373 &query.name,
374 query.if_exists,
375 CollectionModel::Vector,
376 "vector",
377 )
378 }
379
380 pub fn execute_drop_document(
381 &self,
382 raw_query: &str,
383 query: &DropDocumentQuery,
384 ) -> RedDBResult<RuntimeQueryResult> {
385 self.execute_drop_typed_collection(
386 raw_query,
387 &query.name,
388 query.if_exists,
389 CollectionModel::Document,
390 "document",
391 )
392 }
393
394 pub fn execute_drop_kv(
395 &self,
396 raw_query: &str,
397 query: &DropKvQuery,
398 ) -> RedDBResult<RuntimeQueryResult> {
399 let label = polymorphic_resolver::model_name(query.model);
400 self.execute_drop_typed_collection(
401 raw_query,
402 &query.name,
403 query.if_exists,
404 query.model,
405 label,
406 )
407 }
408
409 pub fn execute_drop_collection(
410 &self,
411 raw_query: &str,
412 query: &DropCollectionQuery,
413 ) -> RedDBResult<RuntimeQueryResult> {
414 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
415 if is_system_schema_name(&query.name) {
416 return Err(RedDBError::Query("system schema is read-only".to_string()));
417 }
418 let store = self.inner.db.store();
419 if store.get_collection(&query.name).is_none() {
420 if query.if_exists {
421 return Ok(RuntimeQueryResult::ok_message(
422 raw_query.to_string(),
423 &format!("collection '{}' does not exist", query.name),
424 "drop",
425 ));
426 }
427 return Err(RedDBError::NotFound(format!(
428 "collection '{}' not found",
429 query.name
430 )));
431 }
432
433 match polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())? {
434 CollectionModel::Table => self.execute_drop_table(
435 raw_query,
436 &DropTableQuery {
437 name: query.name.clone(),
438 if_exists: query.if_exists,
439 },
440 ),
441 CollectionModel::TimeSeries => self.execute_drop_timeseries(
442 raw_query,
443 &DropTimeSeriesQuery {
444 name: query.name.clone(),
445 if_exists: query.if_exists,
446 },
447 ),
448 CollectionModel::Queue => self.execute_drop_queue(
449 raw_query,
450 &DropQueueQuery {
451 name: query.name.clone(),
452 if_exists: query.if_exists,
453 },
454 ),
455 CollectionModel::Graph => self.execute_drop_graph(
456 raw_query,
457 &DropGraphQuery {
458 name: query.name.clone(),
459 if_exists: query.if_exists,
460 },
461 ),
462 CollectionModel::Vector => self.execute_drop_vector(
463 raw_query,
464 &DropVectorQuery {
465 name: query.name.clone(),
466 if_exists: query.if_exists,
467 },
468 ),
469 CollectionModel::Document => self.execute_drop_document(
470 raw_query,
471 &DropDocumentQuery {
472 name: query.name.clone(),
473 if_exists: query.if_exists,
474 },
475 ),
476 CollectionModel::Kv => self.execute_drop_kv(
477 raw_query,
478 &DropKvQuery {
479 name: query.name.clone(),
480 if_exists: query.if_exists,
481 model: CollectionModel::Kv,
482 },
483 ),
484 CollectionModel::Config => self.execute_drop_kv(
485 raw_query,
486 &DropKvQuery {
487 name: query.name.clone(),
488 if_exists: query.if_exists,
489 model: CollectionModel::Config,
490 },
491 ),
492 CollectionModel::Vault => self.execute_drop_kv(
493 raw_query,
494 &DropKvQuery {
495 name: query.name.clone(),
496 if_exists: query.if_exists,
497 model: CollectionModel::Vault,
498 },
499 ),
500 CollectionModel::Mixed => self.execute_drop_typed_collection(
501 raw_query,
502 &query.name,
503 query.if_exists,
504 CollectionModel::Mixed,
505 "collection",
506 ),
507 }
508 }
509
510 pub fn execute_alter_table(
516 &self,
517 raw_query: &str,
518 query: &AlterTableQuery,
519 ) -> RedDBResult<RuntimeQueryResult> {
520 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
521 let store = self.inner.db.store();
522
523 if store.get_collection(&query.name).is_none() {
525 return Err(RedDBError::NotFound(format!(
526 "table '{}' not found",
527 query.name
528 )));
529 }
530
531 let mut messages = Vec::new();
532
533 let fields_added: Vec<String> = query
535 .operations
536 .iter()
537 .filter_map(|op| {
538 if let AlterOperation::AddColumn(col) = op {
539 Some(col.name.clone())
540 } else {
541 None
542 }
543 })
544 .collect();
545 let fields_removed: Vec<String> = query
546 .operations
547 .iter()
548 .filter_map(|op| {
549 if let AlterOperation::DropColumn(name) = op {
550 Some(name.clone())
551 } else {
552 None
553 }
554 })
555 .collect();
556
557 for op in &query.operations {
558 match op {
559 AlterOperation::AddColumn(col) => {
560 messages.push(format!("column '{}' added", col.name));
562 }
563 AlterOperation::DropColumn(name) => {
564 messages.push(format!("column '{}' dropped", name));
565 }
566 AlterOperation::RenameColumn { from, to } => {
567 messages.push(format!("column '{}' renamed to '{}'", from, to));
568 }
569 AlterOperation::AttachPartition { child, bound } => {
570 store.set_config_tree(
574 &format!("partition.{}.children.{}", query.name, child),
575 &crate::serde_json::Value::String(bound.clone()),
576 );
577 messages.push(format!(
578 "partition '{child}' attached to '{}' ({bound})",
579 query.name
580 ));
581 }
582 AlterOperation::DetachPartition { child } => {
583 store.set_config_tree(
584 &format!("partition.{}.children.{}", query.name, child),
585 &crate::serde_json::Value::Null,
586 );
587 messages.push(format!(
588 "partition '{child}' detached from '{}'",
589 query.name
590 ));
591 }
592 AlterOperation::EnableRowLevelSecurity => {
593 self.inner
594 .rls_enabled_tables
595 .write()
596 .insert(query.name.clone());
597 store.set_config_tree(
599 &format!("rls.enabled.{}", query.name),
600 &crate::serde_json::Value::Bool(true),
601 );
602 self.invalidate_plan_cache();
603 messages.push(format!("row level security enabled on '{}'", query.name));
604 }
605 AlterOperation::DisableRowLevelSecurity => {
606 self.inner.rls_enabled_tables.write().remove(&query.name);
607 store.set_config_tree(
608 &format!("rls.enabled.{}", query.name),
609 &crate::serde_json::Value::Null,
610 );
611 self.invalidate_plan_cache();
612 messages.push(format!("row level security disabled on '{}'", query.name));
613 }
614 AlterOperation::EnableTenancy { column } => {
616 store.set_config_tree(
617 &format!("tenant_tables.{}.column", query.name),
618 &crate::serde_json::Value::String(column.clone()),
619 );
620 self.register_tenant_table(&query.name, column);
621 self.invalidate_plan_cache();
622 messages.push(format!(
623 "tenancy enabled on '{}' by column '{column}'",
624 query.name
625 ));
626 }
627 AlterOperation::DisableTenancy => {
628 store.set_config_tree(
629 &format!("tenant_tables.{}.column", query.name),
630 &crate::serde_json::Value::Null,
631 );
632 self.unregister_tenant_table(&query.name);
633 self.invalidate_plan_cache();
634 messages.push(format!("tenancy disabled on '{}'", query.name));
635 }
636 AlterOperation::SetAppendOnly(on) => {
637 messages.push(format!(
642 "append_only {} on '{}'",
643 if *on { "enabled" } else { "disabled" },
644 query.name
645 ));
646 }
647 AlterOperation::SetVersioned(on) => {
648 self.vcs_set_versioned(&query.name, *on)?;
655 messages.push(format!(
656 "versioned {} on '{}'",
657 if *on { "enabled" } else { "disabled" },
658 query.name
659 ));
660 }
661 AlterOperation::EnableEvents(subscription) => {
662 let mut subscription = subscription.clone();
663 subscription.source = query.name.clone();
664 validate_event_subscriptions(
665 self,
666 &query.name,
667 std::slice::from_ref(&subscription),
668 )?;
669 ensure_event_target_queue(self, &subscription.target_queue)?;
670 messages.push(format!(
671 "events enabled on '{}' to '{}'",
672 query.name, subscription.target_queue
673 ));
674 }
675 AlterOperation::DisableEvents => {
676 messages.push(format!("events disabled on '{}'", query.name));
677 }
678 AlterOperation::AddSubscription { name, descriptor } => {
679 let mut sub = descriptor.clone();
680 sub.name = name.clone();
681 sub.source = query.name.clone();
682 validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
683 ensure_event_target_queue(self, &sub.target_queue)?;
684 messages.push(format!(
685 "subscription '{}' added on '{}' to '{}'",
686 name, query.name, sub.target_queue
687 ));
688 }
689 AlterOperation::DropSubscription { name } => {
690 messages.push(format!(
691 "subscription '{}' dropped on '{}'",
692 name, query.name
693 ));
694 }
695 }
696 }
697
698 let mut contract = self
699 .inner
700 .db
701 .collection_contract(&query.name)
702 .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
703 apply_alter_operations_to_contract(&mut contract, &query.operations);
704 contract.version = contract.version.saturating_add(1);
705 contract.updated_at_unix_ms = current_unix_ms();
706 self.inner
707 .db
708 .save_collection_contract(contract)
709 .map_err(|err| RedDBError::Internal(err.to_string()))?;
710 if !fields_added.is_empty() || !fields_removed.is_empty() {
714 let sub_names: Vec<String> = self
715 .inner
716 .db
717 .collection_contract(&query.name)
718 .map(|c| {
719 c.subscriptions
720 .iter()
721 .filter(|s| s.enabled)
722 .map(|s| s.name.clone())
723 .collect()
724 })
725 .unwrap_or_default();
726 if !sub_names.is_empty() {
727 crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
728 collection: query.name.clone(),
729 subscription_names: sub_names.join(", "),
730 fields_added: fields_added.join(", "),
731 fields_removed: fields_removed.join(", "),
732 lsn: self.cdc_current_lsn(),
733 }
734 .emit_global();
735 }
736 }
737
738 self.clear_table_planner_stats(&query.name);
739 self.invalidate_result_cache();
740 let post_alter_columns: Vec<String> = self
745 .inner
746 .db
747 .collection_contract(&query.name)
748 .map(|contract| {
749 contract
750 .declared_columns
751 .iter()
752 .map(|col| col.name.clone())
753 .collect()
754 })
755 .unwrap_or_default();
756 self.schema_vocabulary_apply(
757 crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
758 collection: query.name.clone(),
759 columns: post_alter_columns,
760 type_tags: Vec::new(),
761 description: None,
762 },
763 );
764
765 let message = if messages.is_empty() {
766 format!("table '{}' altered (no operations)", query.name)
767 } else {
768 format!("table '{}' altered: {}", query.name, messages.join(", "))
769 };
770
771 Ok(RuntimeQueryResult::ok_message(
772 raw_query.to_string(),
773 &message,
774 "alter",
775 ))
776 }
777
778 pub fn execute_explain_alter(
785 &self,
786 raw_query: &str,
787 query: &ExplainAlterQuery,
788 ) -> RedDBResult<RuntimeQueryResult> {
789 analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
793
794 let current_contract = self.inner.db.collection_contract(&query.target.name);
795
796 let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
797 .as_ref()
798 .map(|c| c.declared_columns.clone())
799 .unwrap_or_default();
800
801 let diff = super::schema_diff::compute_column_diff(
802 &query.target.name,
803 ¤t_columns,
804 &query.target.columns,
805 );
806
807 let rendered = match query.format {
808 ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
809 ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
810 };
811
812 let format_label = match query.format {
813 ExplainFormat::Sql => "sql",
814 ExplainFormat::Json => "json",
815 };
816
817 let columns = vec![
818 "table".to_string(),
819 "format".to_string(),
820 "diff".to_string(),
821 ];
822 let row = vec![
823 ("table".to_string(), Value::text(query.target.name.clone())),
824 ("format".to_string(), Value::text(format_label.to_string())),
825 ("diff".to_string(), Value::text(rendered)),
826 ];
827
828 Ok(RuntimeQueryResult::ok_records(
829 raw_query.to_string(),
830 columns,
831 vec![row],
832 "explain",
833 ))
834 }
835
836 pub fn execute_create_index(
841 &self,
842 raw_query: &str,
843 query: &CreateIndexQuery,
844 ) -> RedDBResult<RuntimeQueryResult> {
845 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
846 let store = self.inner.db.store();
847
848 let manager = store
850 .get_collection(&query.table)
851 .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
852
853 let method_kind = match query.method {
854 IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
855 IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
856 IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
857 IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
858 };
859
860 let entities = manager.query_all(|_| true);
870 let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
871 entities
872 .iter()
873 .map(|e| {
874 let fields = match &e.data {
875 crate::storage::EntityData::Row(row) => {
876 if let Some(ref named) = row.named {
877 named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
878 } else if let Some(ref schema) = row.schema {
879 schema
883 .iter()
884 .zip(row.columns.iter())
885 .map(|(k, v)| (k.clone(), v.clone()))
886 .collect()
887 } else {
888 Vec::new()
889 }
890 }
891 crate::storage::EntityData::Node(node) => node
892 .properties
893 .iter()
894 .map(|(k, v)| (k.clone(), v.clone()))
895 .collect(),
896 _ => Vec::new(),
897 };
898 (e.id, fields)
899 })
900 .collect();
901
902 let indexed_count = self
904 .inner
905 .index_store
906 .create_index(
907 &query.name,
908 &query.table,
909 &query.columns,
910 method_kind,
911 query.unique,
912 &entity_fields,
913 )
914 .map_err(RedDBError::Internal)?;
915
916 let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
917 &query.table,
918 &entity_fields,
919 );
920 crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
921 self.invalidate_plan_cache();
922
923 self.inner
925 .index_store
926 .register(super::index_store::RegisteredIndex {
927 name: query.name.clone(),
928 collection: query.table.clone(),
929 columns: query.columns.clone(),
930 method: method_kind,
931 unique: query.unique,
932 });
933 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
937 collection: query.table.clone(),
938 index: query.name.clone(),
939 columns: query.columns.clone(),
940 });
941
942 let method_str = format!("{}", query.method);
943 let unique_str = if query.unique { "unique " } else { "" };
944 let cols = query.columns.join(", ");
945
946 Ok(RuntimeQueryResult::ok_message(
947 raw_query.to_string(),
948 &format!(
949 "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
950 unique_str, query.name, query.table, cols, method_str, indexed_count
951 ),
952 "create",
953 ))
954 }
955
956 pub fn execute_drop_index(
960 &self,
961 raw_query: &str,
962 query: &DropIndexQuery,
963 ) -> RedDBResult<RuntimeQueryResult> {
964 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
965 let store = self.inner.db.store();
966
967 if store.get_collection(&query.table).is_none() {
969 if query.if_exists {
970 return Ok(RuntimeQueryResult::ok_message(
971 raw_query.to_string(),
972 &format!("table '{}' does not exist", query.table),
973 "drop",
974 ));
975 }
976 return Err(RedDBError::NotFound(format!(
977 "table '{}' not found",
978 query.table
979 )));
980 }
981
982 self.inner.index_store.drop_index(&query.name, &query.table);
984 self.invalidate_plan_cache();
985 self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
987 collection: query.table.clone(),
988 index: query.name.clone(),
989 });
990
991 Ok(RuntimeQueryResult::ok_message(
992 raw_query.to_string(),
993 &format!("index '{}' dropped from '{}'", query.name, query.table),
994 "drop",
995 ))
996 }
997
998 fn execute_drop_typed_collection(
999 &self,
1000 raw_query: &str,
1001 name: &str,
1002 if_exists: bool,
1003 expected_model: CollectionModel,
1004 label: &str,
1005 ) -> RedDBResult<RuntimeQueryResult> {
1006 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1007 if is_system_schema_name(name) {
1008 return Err(RedDBError::Query("system schema is read-only".to_string()));
1009 }
1010 let store = self.inner.db.store();
1011 if store.get_collection(name).is_none() {
1012 if if_exists {
1013 return Ok(RuntimeQueryResult::ok_message(
1014 raw_query.to_string(),
1015 &format!("{label} '{name}' does not exist"),
1016 "drop",
1017 ));
1018 }
1019 return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1020 }
1021
1022 let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1023 polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1024 self.drop_collection_storage(raw_query, name, label)
1025 }
1026
1027 pub fn execute_truncate(
1028 &self,
1029 raw_query: &str,
1030 query: &TruncateQuery,
1031 ) -> RedDBResult<RuntimeQueryResult> {
1032 self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1033 if is_system_schema_name(&query.name) {
1034 return Err(RedDBError::Query("system schema is read-only".to_string()));
1035 }
1036
1037 let label = query
1038 .model
1039 .map(polymorphic_resolver::model_name)
1040 .unwrap_or("collection");
1041 let store = self.inner.db.store();
1042 if store.get_collection(&query.name).is_none() {
1043 if query.if_exists {
1044 return Ok(RuntimeQueryResult::ok_message(
1045 raw_query.to_string(),
1046 &format!("{label} '{}' does not exist", query.name),
1047 "truncate",
1048 ));
1049 }
1050 return Err(RedDBError::NotFound(format!(
1051 "{label} '{}' not found",
1052 query.name
1053 )));
1054 }
1055
1056 let actual =
1057 polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1058 if let Some(expected) = query.model {
1059 polymorphic_resolver::ensure_model_match(expected, actual)?;
1060 }
1061
1062 if actual == CollectionModel::Queue {
1063 return self.execute_queue_command(
1064 raw_query,
1065 &QueueCommand::Purge {
1066 queue: query.name.clone(),
1067 },
1068 );
1069 }
1070
1071 let affected = self.truncate_collection_entities(&query.name)?;
1073 crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1075 self.inner.db.invalidate_vector_index(&query.name);
1076 self.clear_table_planner_stats(&query.name);
1077 self.invalidate_result_cache();
1078
1079 Ok(RuntimeQueryResult::ok_message(
1080 raw_query.to_string(),
1081 &format!(
1082 "{affected} entities truncated from {label} '{}'",
1083 query.name
1084 ),
1085 "truncate",
1086 ))
1087 }
1088
1089 fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1090 let store = self.inner.db.store();
1091 let Some(manager) = store.get_collection(name) else {
1092 return Ok(0);
1093 };
1094 let entities = manager.query_all(|_| true);
1095 if entities.is_empty() {
1096 return Ok(0);
1097 }
1098
1099 for entity in &entities {
1100 let fields = entity_index_fields(&entity.data);
1101 self.inner
1102 .index_store
1103 .index_entity_delete(name, entity.id, &fields)
1104 .map_err(RedDBError::Internal)?;
1105 }
1106
1107 let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1108 let deleted_ids = store
1109 .delete_batch(name, &ids)
1110 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1111 for id in &deleted_ids {
1112 store.context_index().remove_entity(*id);
1113 }
1114 Ok(deleted_ids.len() as u64)
1115 }
1116
1117 fn drop_collection_storage(
1118 &self,
1119 raw_query: &str,
1120 name: &str,
1121 label: &str,
1122 ) -> RedDBResult<RuntimeQueryResult> {
1123 let store = self.inner.db.store();
1124
1125 let final_count = store
1128 .get_collection(name)
1129 .map(|manager| manager.query_all(|_| true).len() as u64)
1130 .unwrap_or(0);
1131 crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1132 self,
1133 name,
1134 final_count,
1135 )?;
1136
1137 let orphaned_indices: Vec<String> = self
1138 .inner
1139 .index_store
1140 .list_indices(name)
1141 .into_iter()
1142 .map(|index| index.name)
1143 .collect();
1144 for index_name in &orphaned_indices {
1145 self.inner.index_store.drop_index(index_name, name);
1146 }
1147
1148 store
1149 .drop_collection(name)
1150 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1151 self.inner.db.invalidate_vector_index(name);
1152 self.inner.db.clear_collection_default_ttl_ms(name);
1153 self.inner
1154 .db
1155 .remove_collection_contract(name)
1156 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1157 self.clear_table_planner_stats(name);
1158 self.invalidate_result_cache();
1159 if let Some(store) = self.inner.auth_store.read().clone() {
1160 store.invalidate_visible_collections_cache();
1161 }
1162 self.inner
1163 .db
1164 .persist_metadata()
1165 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1166 self.schema_vocabulary_apply(
1167 crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1168 collection: name.to_string(),
1169 },
1170 );
1171
1172 Ok(RuntimeQueryResult::ok_message(
1173 raw_query.to_string(),
1174 &format!("{label} '{name}' dropped"),
1175 "drop",
1176 ))
1177 }
1178}
1179
1180pub(crate) fn is_system_schema_name(name: &str) -> bool {
1181 name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1182}
1183
1184fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1185 match data {
1186 EntityData::Row(row) => {
1187 if let Some(ref named) = row.named {
1188 named
1189 .iter()
1190 .map(|(key, value)| (key.clone(), value.clone()))
1191 .collect()
1192 } else if let Some(ref schema) = row.schema {
1193 schema
1194 .iter()
1195 .zip(row.columns.iter())
1196 .map(|(key, value)| (key.clone(), value.clone()))
1197 .collect()
1198 } else {
1199 Vec::new()
1200 }
1201 }
1202 EntityData::Node(node) => node
1203 .properties
1204 .iter()
1205 .map(|(key, value)| (key.clone(), value.clone()))
1206 .collect(),
1207 _ => Vec::new(),
1208 }
1209}
1210
1211fn collection_contract_from_create_table(
1212 query: &CreateTableQuery,
1213) -> RedDBResult<crate::physical::CollectionContract> {
1214 let now = current_unix_ms();
1215 let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1216 .columns
1217 .iter()
1218 .map(declared_column_contract_from_ddl)
1219 .collect();
1220 if query.timestamps {
1221 declared_columns.push(crate::physical::DeclaredColumnContract {
1225 name: "created_at".to_string(),
1226 data_type: "BIGINT".to_string(),
1227 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1228 not_null: true,
1229 default: None,
1230 compress: None,
1231 unique: false,
1232 primary_key: false,
1233 enum_variants: Vec::new(),
1234 array_element: None,
1235 decimal_precision: None,
1236 });
1237 declared_columns.push(crate::physical::DeclaredColumnContract {
1238 name: "updated_at".to_string(),
1239 data_type: "BIGINT".to_string(),
1240 sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1241 not_null: true,
1242 default: None,
1243 compress: None,
1244 unique: false,
1245 primary_key: false,
1246 enum_variants: Vec::new(),
1247 array_element: None,
1248 decimal_precision: None,
1249 });
1250 }
1251 Ok(crate::physical::CollectionContract {
1252 name: query.name.clone(),
1253 declared_model: crate::catalog::CollectionModel::Table,
1254 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1255 origin: crate::physical::ContractOrigin::Explicit,
1256 version: 1,
1257 created_at_unix_ms: now,
1258 updated_at_unix_ms: now,
1259 default_ttl_ms: query.default_ttl_ms,
1260 context_index_fields: query.context_index_fields.clone(),
1261 declared_columns,
1262 table_def: Some(build_table_def_from_create_table(query)?),
1263 timestamps_enabled: query.timestamps,
1264 context_index_enabled: query.context_index_enabled
1265 || !query.context_index_fields.is_empty(),
1266 append_only: query.append_only,
1267 subscriptions: query.subscriptions.clone(),
1268 })
1269}
1270
1271fn default_collection_contract_for_existing_table(
1272 name: &str,
1273) -> crate::physical::CollectionContract {
1274 let now = current_unix_ms();
1275 crate::physical::CollectionContract {
1276 name: name.to_string(),
1277 declared_model: crate::catalog::CollectionModel::Table,
1278 schema_mode: crate::catalog::SchemaMode::SemiStructured,
1279 origin: crate::physical::ContractOrigin::Explicit,
1280 version: 0,
1281 created_at_unix_ms: now,
1282 updated_at_unix_ms: now,
1283 default_ttl_ms: None,
1284 context_index_fields: Vec::new(),
1285 declared_columns: Vec::new(),
1286 table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1287 timestamps_enabled: false,
1288 context_index_enabled: false,
1289 append_only: false,
1290 subscriptions: Vec::new(),
1291 }
1292}
1293
1294fn keyed_collection_contract(
1295 name: &str,
1296 model: crate::catalog::CollectionModel,
1297) -> crate::physical::CollectionContract {
1298 let now = current_unix_ms();
1299 crate::physical::CollectionContract {
1300 name: name.to_string(),
1301 declared_model: model,
1302 schema_mode: crate::catalog::SchemaMode::Dynamic,
1303 origin: crate::physical::ContractOrigin::Explicit,
1304 version: 1,
1305 created_at_unix_ms: now,
1306 updated_at_unix_ms: now,
1307 default_ttl_ms: None,
1308 context_index_fields: Vec::new(),
1309 declared_columns: Vec::new(),
1310 table_def: None,
1311 timestamps_enabled: false,
1312 context_index_enabled: false,
1313 append_only: false,
1314 subscriptions: Vec::new(),
1315 }
1316}
1317
1318fn declared_column_contract_from_ddl(
1319 column: &CreateColumnDef,
1320) -> crate::physical::DeclaredColumnContract {
1321 crate::physical::DeclaredColumnContract {
1322 name: column.name.clone(),
1323 data_type: column.data_type.clone(),
1324 sql_type: Some(column.sql_type.clone()),
1325 not_null: column.not_null,
1326 default: column.default.clone(),
1327 compress: column.compress,
1328 unique: column.unique,
1329 primary_key: column.primary_key,
1330 enum_variants: column.enum_variants.clone(),
1331 array_element: column.array_element.clone(),
1332 decimal_precision: column.decimal_precision,
1333 }
1334}
1335
1336fn apply_alter_operations_to_contract(
1337 contract: &mut crate::physical::CollectionContract,
1338 operations: &[AlterOperation],
1339) {
1340 if contract.table_def.is_none() {
1341 contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1342 }
1343 for operation in operations {
1344 match operation {
1345 AlterOperation::AddColumn(column) => {
1346 if !contract
1347 .declared_columns
1348 .iter()
1349 .any(|existing| existing.name == column.name)
1350 {
1351 contract
1352 .declared_columns
1353 .push(declared_column_contract_from_ddl(column));
1354 }
1355 if let Some(table_def) = contract.table_def.as_mut() {
1356 if table_def.get_column(&column.name).is_none() {
1357 if let Ok(column_def) = column_def_from_ddl(column) {
1358 if column.primary_key {
1359 table_def.primary_key.push(column.name.clone());
1360 table_def.constraints.push(
1361 crate::storage::schema::Constraint::new(
1362 format!("pk_{}", column.name),
1363 crate::storage::schema::ConstraintType::PrimaryKey,
1364 )
1365 .on_columns(vec![column.name.clone()]),
1366 );
1367 }
1368 if column.unique {
1369 table_def.constraints.push(
1370 crate::storage::schema::Constraint::new(
1371 format!("uniq_{}", column.name),
1372 crate::storage::schema::ConstraintType::Unique,
1373 )
1374 .on_columns(vec![column.name.clone()]),
1375 );
1376 }
1377 if column.not_null {
1378 table_def.constraints.push(
1379 crate::storage::schema::Constraint::new(
1380 format!("not_null_{}", column.name),
1381 crate::storage::schema::ConstraintType::NotNull,
1382 )
1383 .on_columns(vec![column.name.clone()]),
1384 );
1385 }
1386 table_def.columns.push(column_def);
1387 }
1388 }
1389 }
1390 }
1391 AlterOperation::DropColumn(name) => {
1392 contract
1393 .declared_columns
1394 .retain(|column| column.name != *name);
1395 if let Some(table_def) = contract.table_def.as_mut() {
1396 if let Some(index) = table_def.column_index(name) {
1397 table_def.columns.remove(index);
1398 }
1399 table_def.primary_key.retain(|column| column != name);
1400 table_def.constraints.retain(|constraint| {
1401 !constraint.columns.iter().any(|column| column == name)
1402 });
1403 table_def
1404 .indexes
1405 .retain(|index| !index.columns.iter().any(|column| column == name));
1406 }
1407 }
1408 AlterOperation::RenameColumn { from, to } => {
1409 if contract
1410 .declared_columns
1411 .iter()
1412 .any(|column| column.name == *to)
1413 {
1414 continue;
1415 }
1416 if let Some(column) = contract
1417 .declared_columns
1418 .iter_mut()
1419 .find(|column| column.name == *from)
1420 {
1421 column.name = to.clone();
1422 }
1423 if let Some(table_def) = contract.table_def.as_mut() {
1424 if let Some(column) = table_def
1425 .columns
1426 .iter_mut()
1427 .find(|column| column.name == *from)
1428 {
1429 column.name = to.clone();
1430 }
1431 for primary_key in &mut table_def.primary_key {
1432 if *primary_key == *from {
1433 *primary_key = to.clone();
1434 }
1435 }
1436 for constraint in &mut table_def.constraints {
1437 for column in &mut constraint.columns {
1438 if *column == *from {
1439 *column = to.clone();
1440 }
1441 }
1442 if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1443 for column in ref_columns {
1444 if *column == *from {
1445 *column = to.clone();
1446 }
1447 }
1448 }
1449 }
1450 for index in &mut table_def.indexes {
1451 for column in &mut index.columns {
1452 if *column == *from {
1453 *column = to.clone();
1454 }
1455 }
1456 }
1457 }
1458 }
1459 AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1462 AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1466 AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1469 AlterOperation::SetAppendOnly(on) => {
1470 contract.append_only = *on;
1471 }
1472 AlterOperation::SetVersioned(_) => {}
1475 AlterOperation::EnableEvents(subscription) => {
1476 let mut subscription = subscription.clone();
1477 subscription.source = contract.name.clone();
1478 subscription.enabled = true;
1479 if let Some(existing) = contract
1480 .subscriptions
1481 .iter_mut()
1482 .find(|existing| existing.target_queue == subscription.target_queue)
1483 {
1484 *existing = subscription;
1485 } else {
1486 contract.subscriptions.push(subscription);
1487 }
1488 }
1489 AlterOperation::DisableEvents => {
1490 for subscription in &mut contract.subscriptions {
1491 subscription.enabled = false;
1492 }
1493 }
1494 AlterOperation::AddSubscription { name, descriptor } => {
1495 let mut sub = descriptor.clone();
1496 sub.name = name.clone();
1497 sub.source = contract.name.clone();
1498 sub.enabled = true;
1499 if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1500 {
1501 *existing = sub;
1502 } else {
1503 contract.subscriptions.push(sub);
1504 }
1505 }
1506 AlterOperation::DropSubscription { name } => {
1507 contract.subscriptions.retain(|s| s.name != *name);
1508 }
1509 }
1510 }
1511}
1512
1513fn validate_event_subscriptions(
1514 runtime: &RedDBRuntime,
1515 source: &str,
1516 subscriptions: &[crate::catalog::SubscriptionDescriptor],
1517) -> RedDBResult<()> {
1518 for subscription in subscriptions
1519 .iter()
1520 .filter(|subscription| subscription.enabled)
1521 {
1522 if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1523 return Err(RedDBError::Query(
1524 "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1525 ));
1526 }
1527 validate_subscription_auth(runtime, source, subscription)?;
1528 if subscription.target_queue == source
1529 || subscription_would_create_cycle(
1530 &runtime.inner.db,
1531 source,
1532 &subscription.target_queue,
1533 )
1534 {
1535 return Err(RedDBError::Query(
1536 "subscription would create cycle".to_string(),
1537 ));
1538 }
1539 audit_subscription_redact_gap(runtime, source, subscription);
1540 }
1541 Ok(())
1542}
1543
1544fn validate_subscription_auth(
1545 runtime: &RedDBRuntime,
1546 source: &str,
1547 subscription: &crate::catalog::SubscriptionDescriptor,
1548) -> RedDBResult<()> {
1549 let auth_store = match runtime.inner.auth_store.read().clone() {
1550 Some(store) => store,
1551 None => return Ok(()),
1552 };
1553 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1554 Some(identity) => identity,
1555 None => return Ok(()),
1556 };
1557 let tenant = crate::runtime::impl_core::current_tenant();
1558 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1559
1560 if auth_store.iam_authorization_enabled() {
1561 let ctx = crate::auth::policies::EvalContext {
1562 principal_tenant: tenant.clone(),
1563 current_tenant: tenant.clone(),
1564 peer_ip: None,
1565 mfa_present: false,
1566 now_ms: crate::auth::now_ms(),
1567 principal_is_admin_role: role == crate::auth::Role::Admin,
1568 };
1569 let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1570 if let Some(t) = tenant.as_deref() {
1571 source_resource = source_resource.with_tenant(t.to_string());
1572 }
1573 if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1574 return Err(RedDBError::Query(format!(
1575 "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1576 principal, source_resource.kind, source_resource.name
1577 )));
1578 }
1579
1580 let mut target_resource =
1581 crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1582 if let Some(t) = tenant.as_deref() {
1583 target_resource = target_resource.with_tenant(t.to_string());
1584 }
1585 if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1586 return Err(RedDBError::Query(format!(
1587 "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1588 principal, target_resource.kind, target_resource.name
1589 )));
1590 }
1591 return Ok(());
1592 }
1593
1594 let ctx = crate::auth::privileges::AuthzContext {
1595 principal: &username,
1596 effective_role: role,
1597 tenant: tenant.as_deref(),
1598 };
1599 auth_store
1600 .check_grant(
1601 &ctx,
1602 crate::auth::privileges::Action::Select,
1603 &crate::auth::privileges::Resource::table_from_name(source),
1604 )
1605 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1606 auth_store
1607 .check_grant(
1608 &ctx,
1609 crate::auth::privileges::Action::Insert,
1610 &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1611 )
1612 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1613 Ok(())
1614}
1615
1616fn audit_subscription_redact_gap(
1617 runtime: &RedDBRuntime,
1618 source: &str,
1619 subscription: &crate::catalog::SubscriptionDescriptor,
1620) {
1621 let auth_store = match runtime.inner.auth_store.read().clone() {
1622 Some(store) if store.iam_authorization_enabled() => store,
1623 _ => return,
1624 };
1625 let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1626 Some(identity) => identity,
1627 None => return,
1628 };
1629 let tenant = crate::runtime::impl_core::current_tenant();
1630 let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1631 let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1632 if missing.is_empty() {
1633 return;
1634 }
1635
1636 let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1637 tracing::warn!(
1638 target: "reddb::operator",
1639 "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1640 source,
1641 subscription.target_queue,
1642 columns
1643 );
1644 let mut event = AuditEvent::builder("subscription_redact_gap")
1645 .principal(username)
1646 .source(AuditAuthSource::System)
1647 .resource(format!(
1648 "subscription:{}->{}",
1649 source, subscription.target_queue
1650 ))
1651 .outcome(Outcome::Success)
1652 .field(AuditFieldEscaper::field("source", source))
1653 .field(AuditFieldEscaper::field(
1654 "target_queue",
1655 subscription.target_queue.clone(),
1656 ))
1657 .field(AuditFieldEscaper::field(
1658 "subscription",
1659 subscription.name.clone(),
1660 ))
1661 .field(AuditFieldEscaper::field("columns", columns))
1662 .field(AuditFieldEscaper::field("role", role.as_str()));
1663 if let Some(t) = tenant {
1664 event = event.tenant(t);
1665 }
1666 runtime.inner.audit_log.record_event(event.build());
1667}
1668
1669fn subscription_redact_gap_columns(
1670 auth_store: &crate::auth::store::AuthStore,
1671 principal: &crate::auth::UserId,
1672 source: &str,
1673 subscription: &crate::catalog::SubscriptionDescriptor,
1674) -> BTreeSet<String> {
1675 let redacted: HashSet<String> = subscription
1676 .redact_fields
1677 .iter()
1678 .map(|field| field.to_ascii_lowercase())
1679 .collect();
1680 auth_store
1681 .effective_policies(principal)
1682 .iter()
1683 .flat_map(|policy| policy.statements.iter())
1684 .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1685 .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1686 .flat_map(|statement| statement.resources.iter())
1687 .filter_map(|resource| denied_column_for_source(resource, source))
1688 .filter(|column| !redact_covers_column(&redacted, source, column))
1689 .collect()
1690}
1691
1692fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1693 match pattern {
1694 crate::auth::policies::ActionPattern::Wildcard => true,
1695 crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1696 crate::auth::policies::ActionPattern::Prefix(prefix) => {
1697 "select".len() > prefix.len() + 1
1698 && "select".starts_with(prefix)
1699 && "select".as_bytes()[prefix.len()] == b':'
1700 }
1701 }
1702}
1703
1704fn denied_column_for_source(
1705 resource: &crate::auth::policies::ResourcePattern,
1706 source: &str,
1707) -> Option<String> {
1708 let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1709 return None;
1710 };
1711 if kind != "column" {
1712 return None;
1713 }
1714 let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1715 (column.table_resource_name() == source).then_some(column.column)
1716}
1717
1718fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1719 let column = column.to_ascii_lowercase();
1720 let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1721 redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1722}
1723
1724fn subscription_would_create_cycle(
1725 db: &crate::storage::unified::devx::RedDB,
1726 source: &str,
1727 target: &str,
1728) -> bool {
1729 let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1730 for contract in db.collection_contracts() {
1731 for subscription in contract
1732 .subscriptions
1733 .into_iter()
1734 .filter(|subscription| subscription.enabled)
1735 {
1736 graph
1737 .entry(subscription.source)
1738 .or_default()
1739 .push(subscription.target_queue);
1740 }
1741 }
1742 graph
1743 .entry(source.to_string())
1744 .or_default()
1745 .push(target.to_string());
1746
1747 let mut stack = vec![target.to_string()];
1748 let mut seen = HashSet::new();
1749 while let Some(node) = stack.pop() {
1750 if node == source {
1751 return true;
1752 }
1753 if !seen.insert(node.clone()) {
1754 continue;
1755 }
1756 if let Some(next) = graph.get(&node) {
1757 stack.extend(next.iter().cloned());
1758 }
1759 }
1760 false
1761}
1762
1763pub(crate) fn ensure_event_target_queue_pub(
1764 runtime: &RedDBRuntime,
1765 queue: &str,
1766) -> RedDBResult<()> {
1767 ensure_event_target_queue(runtime, queue)
1768}
1769
1770fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
1771 let store = runtime.inner.db.store();
1772 if store.get_collection(queue).is_some() {
1773 return Ok(());
1774 }
1775 store
1776 .create_collection(queue)
1777 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1778 runtime
1779 .inner
1780 .db
1781 .save_collection_contract(event_queue_collection_contract(queue))
1782 .map_err(|err| RedDBError::Internal(err.to_string()))?;
1783 store.set_config_tree(
1784 &format!("queue.{queue}.mode"),
1785 &crate::serde_json::Value::String("fanout".to_string()),
1786 );
1787 Ok(())
1788}
1789
1790fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
1791 let now = current_unix_ms();
1792 crate::physical::CollectionContract {
1793 name: queue.to_string(),
1794 declared_model: crate::catalog::CollectionModel::Queue,
1795 schema_mode: crate::catalog::SchemaMode::Dynamic,
1796 origin: crate::physical::ContractOrigin::Implicit,
1797 version: 1,
1798 created_at_unix_ms: now,
1799 updated_at_unix_ms: now,
1800 default_ttl_ms: None,
1801 context_index_fields: Vec::new(),
1802 declared_columns: Vec::new(),
1803 table_def: None,
1804 timestamps_enabled: false,
1805 context_index_enabled: false,
1806 append_only: true,
1807 subscriptions: Vec::new(),
1808 }
1809}
1810
1811fn build_table_def_from_create_table(
1812 query: &CreateTableQuery,
1813) -> RedDBResult<crate::storage::schema::TableDef> {
1814 let mut table = crate::storage::schema::TableDef::new(query.name.clone());
1815 for column in &query.columns {
1816 if column.primary_key {
1817 table.primary_key.push(column.name.clone());
1818 table.constraints.push(
1819 crate::storage::schema::Constraint::new(
1820 format!("pk_{}", column.name),
1821 crate::storage::schema::ConstraintType::PrimaryKey,
1822 )
1823 .on_columns(vec![column.name.clone()]),
1824 );
1825 }
1826 if column.unique {
1827 table.constraints.push(
1828 crate::storage::schema::Constraint::new(
1829 format!("uniq_{}", column.name),
1830 crate::storage::schema::ConstraintType::Unique,
1831 )
1832 .on_columns(vec![column.name.clone()]),
1833 );
1834 }
1835 if column.not_null {
1836 table.constraints.push(
1837 crate::storage::schema::Constraint::new(
1838 format!("not_null_{}", column.name),
1839 crate::storage::schema::ConstraintType::NotNull,
1840 )
1841 .on_columns(vec![column.name.clone()]),
1842 );
1843 }
1844 table.columns.push(column_def_from_ddl(column)?);
1845 }
1846 if query.timestamps {
1851 table.columns.push(
1852 crate::storage::schema::ColumnDef::new(
1853 "created_at".to_string(),
1854 crate::storage::schema::DataType::UnsignedInteger,
1855 )
1856 .not_null(),
1857 );
1858 table.columns.push(
1859 crate::storage::schema::ColumnDef::new(
1860 "updated_at".to_string(),
1861 crate::storage::schema::DataType::UnsignedInteger,
1862 )
1863 .not_null(),
1864 );
1865 table.constraints.push(
1866 crate::storage::schema::Constraint::new(
1867 "not_null_created_at".to_string(),
1868 crate::storage::schema::ConstraintType::NotNull,
1869 )
1870 .on_columns(vec!["created_at".to_string()]),
1871 );
1872 table.constraints.push(
1873 crate::storage::schema::Constraint::new(
1874 "not_null_updated_at".to_string(),
1875 crate::storage::schema::ConstraintType::NotNull,
1876 )
1877 .on_columns(vec!["updated_at".to_string()]),
1878 );
1879 }
1880 table
1881 .validate()
1882 .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
1883 Ok(table)
1884}
1885
1886fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
1887 let data_type = resolve_declared_data_type(&column.data_type)
1888 .map_err(|err| RedDBError::Query(err.to_string()))?;
1889 let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
1890 if column.not_null {
1891 column_def = column_def.not_null();
1892 }
1893 if let Some(default) = &column.default {
1894 column_def = column_def.with_default(default.as_bytes().to_vec());
1895 }
1896 if column.compress.unwrap_or(0) > 0 {
1897 column_def = column_def.compressed();
1898 }
1899 if !column.enum_variants.is_empty() {
1900 column_def = column_def.with_variants(column.enum_variants.clone());
1901 }
1902 if let Some(precision) = column.decimal_precision {
1903 column_def = column_def.with_precision(precision);
1904 }
1905 if let Some(element_type) = &column.array_element {
1906 column_def = column_def.with_element_type(
1907 resolve_declared_data_type(element_type)
1908 .map_err(|err| RedDBError::Query(err.to_string()))?,
1909 );
1910 }
1911 column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
1912 if column.unique {
1913 column_def = column_def.with_metadata("unique", "true");
1914 }
1915 if column.primary_key {
1916 column_def = column_def.with_metadata("primary_key", "true");
1917 }
1918 Ok(column_def)
1919}
1920
1921fn current_unix_ms() -> u128 {
1922 std::time::SystemTime::now()
1923 .duration_since(std::time::UNIX_EPOCH)
1924 .unwrap_or_default()
1925 .as_millis()
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930 use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
1931 use crate::auth::store::{AuthStore, PrincipalRef};
1932 use crate::auth::UserId;
1933 use crate::auth::{AuthConfig, Role};
1934 use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
1935 use crate::storage::schema::Value;
1936 use crate::{RedDBOptions, RedDBRuntime};
1937 use std::sync::Arc;
1938
1939 fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
1940 Policy {
1941 id: id.to_string(),
1942 version: 1,
1943 tenant: None,
1944 created_at: 0,
1945 updated_at: 0,
1946 statements: vec![Statement {
1947 sid: None,
1948 effect: Effect::Allow,
1949 actions: vec![ActionPattern::Exact(action.to_string())],
1950 resources: vec![ResourcePattern::Exact {
1951 kind: "collection".to_string(),
1952 name: collection.to_string(),
1953 }],
1954 condition: None,
1955 }],
1956 }
1957 }
1958
1959 fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
1960 let store = Arc::new(AuthStore::new(AuthConfig::default()));
1961 *rt.inner.auth_store.write() = Some(store.clone());
1962 store
1963 }
1964
1965 #[test]
1966 fn drop_denied_without_iam_policy() {
1967 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
1968 rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
1969 let store = wire_auth_store(&rt);
1970 let select_only = Policy {
1972 id: "select-only".to_string(),
1973 version: 1,
1974 tenant: None,
1975 created_at: 0,
1976 updated_at: 0,
1977 statements: vec![Statement {
1978 sid: None,
1979 effect: Effect::Allow,
1980 actions: vec![ActionPattern::Exact("select".to_string())],
1981 resources: vec![ResourcePattern::Wildcard],
1982 condition: None,
1983 }],
1984 };
1985 store.put_policy_internal(select_only).unwrap();
1986 let alice = UserId::from_parts(None, "alice");
1987 store
1988 .attach_policy(PrincipalRef::User(alice), "select-only")
1989 .unwrap();
1990 set_current_auth_identity("alice".to_string(), Role::Write);
1991 let err = rt.execute_query("DROP TABLE foo").unwrap_err();
1992 clear_current_auth_identity();
1993 assert!(
1994 format!("{err}").contains("denied by IAM policy"),
1995 "got: {err}"
1996 );
1997 }
1998
1999 #[test]
2000 fn drop_allowed_with_explicit_iam_policy() {
2001 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2002 rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2003 let store = wire_auth_store(&rt);
2004 let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2005 store.put_policy_internal(policy).unwrap();
2006 let bob = UserId::from_parts(None, "bob");
2007 store
2008 .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2009 .unwrap();
2010 set_current_auth_identity("bob".to_string(), Role::Write);
2011 rt.execute_query("DROP TABLE bar").unwrap();
2012 clear_current_auth_identity();
2013 }
2014
2015 #[test]
2016 fn drop_allowed_with_wildcard_iam_policy() {
2017 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2018 rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2019 let store = wire_auth_store(&rt);
2020 let policy = Policy {
2021 id: "allow-drop-all".to_string(),
2022 version: 1,
2023 tenant: None,
2024 created_at: 0,
2025 updated_at: 0,
2026 statements: vec![Statement {
2027 sid: None,
2028 effect: Effect::Allow,
2029 actions: vec![ActionPattern::Exact("drop".to_string())],
2030 resources: vec![ResourcePattern::Wildcard],
2031 condition: None,
2032 }],
2033 };
2034 store.put_policy_internal(policy).unwrap();
2035 let carl = UserId::from_parts(None, "carl");
2036 store
2037 .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2038 .unwrap();
2039 set_current_auth_identity("carl".to_string(), Role::Write);
2040 rt.execute_query("DROP TABLE baz").unwrap();
2041 clear_current_auth_identity();
2042 }
2043
2044 #[test]
2045 fn truncate_denied_without_iam_policy() {
2046 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2047 rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2048 let store = wire_auth_store(&rt);
2049 let select_only = Policy {
2051 id: "select-only-2".to_string(),
2052 version: 1,
2053 tenant: None,
2054 created_at: 0,
2055 updated_at: 0,
2056 statements: vec![Statement {
2057 sid: None,
2058 effect: Effect::Allow,
2059 actions: vec![ActionPattern::Exact("select".to_string())],
2060 resources: vec![ResourcePattern::Wildcard],
2061 condition: None,
2062 }],
2063 };
2064 store.put_policy_internal(select_only).unwrap();
2065 let dana = UserId::from_parts(None, "dana");
2066 store
2067 .attach_policy(PrincipalRef::User(dana), "select-only-2")
2068 .unwrap();
2069 set_current_auth_identity("dana".to_string(), Role::Write);
2070 let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2071 clear_current_auth_identity();
2072 assert!(
2073 format!("{err}").contains("denied by IAM policy"),
2074 "got: {err}"
2075 );
2076 }
2077
2078 #[test]
2079 fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2080 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2081 rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2082 .unwrap();
2083 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2084 .unwrap();
2085 rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2086 .unwrap();
2087
2088 let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2089 assert_eq!(truncated.statement_type, "truncate");
2090 assert_eq!(truncated.affected_rows, 0);
2091
2092 let empty = rt.execute_query("SELECT id FROM users").unwrap();
2093 assert!(empty.result.records.is_empty());
2094
2095 rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2096 .unwrap();
2097 let selected = rt
2098 .execute_query("SELECT name FROM users WHERE id = 3")
2099 .unwrap();
2100 let name = selected.result.records[0].get("name").unwrap();
2101 assert_eq!(name, &Value::text("cy"));
2102 assert!(rt.db().collection_contract("users").is_some());
2103 assert!(rt
2104 .inner
2105 .index_store
2106 .list_indices("users")
2107 .iter()
2108 .any(|index| index.name == "idx_users_id"));
2109 }
2110
2111 #[test]
2112 fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2113 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2114 rt.execute_query("CREATE QUEUE tasks").unwrap();
2115 rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2116
2117 let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2118 assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2119
2120 rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2121 let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2122 assert_eq!(
2123 len.result.records[0].get("len"),
2124 Some(&Value::UnsignedInteger(0))
2125 );
2126 }
2127
2128 #[test]
2129 fn truncate_system_schema_is_read_only() {
2130 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2131 let err = rt
2132 .execute_query("TRUNCATE COLLECTION red.collections")
2133 .unwrap_err();
2134 assert!(format!("{err}").contains("system schema is read-only"));
2135 }
2136
2137 fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2140 let result = rt
2141 .execute_query(&format!("QUEUE PEEK {queue} 100"))
2142 .expect("peek queue");
2143 result
2144 .result
2145 .records
2146 .iter()
2147 .map(
2148 |record| match record.get("payload").expect("payload column") {
2149 Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2150 other => panic!("expected JSON queue payload, got {other:?}"),
2151 },
2152 )
2153 .collect()
2154 }
2155
2156 #[test]
2159 fn truncate_event_enabled_table_emits_single_truncate_event() {
2160 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2161 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2162 .unwrap();
2163 rt.execute_query(
2164 "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2165 )
2166 .unwrap();
2167
2168 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2170
2171 rt.execute_query("TRUNCATE TABLE users").unwrap();
2172
2173 let events = queue_payloads(&rt, "users_events");
2174 assert_eq!(
2176 events.len(),
2177 1,
2178 "expected 1 truncate event, got {}",
2179 events.len()
2180 );
2181 let ev = events[0].as_object().expect("event is object");
2182 assert_eq!(
2183 ev.get("op").and_then(crate::json::Value::as_str),
2184 Some("truncate")
2185 );
2186 assert_eq!(
2187 ev.get("collection").and_then(crate::json::Value::as_str),
2188 Some("users")
2189 );
2190 assert_eq!(
2191 ev.get("entities_count")
2192 .and_then(crate::json::Value::as_u64),
2193 Some(3)
2194 );
2195 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2196 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2197 assert!(ev
2198 .get("event_id")
2199 .and_then(crate::json::Value::as_str)
2200 .is_some_and(|s| !s.is_empty()));
2201 }
2202
2203 #[test]
2205 fn truncate_no_events_collection_emits_nothing() {
2206 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2207 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2208 .unwrap();
2209 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2210 .unwrap();
2211 rt.execute_query("TRUNCATE TABLE plain").unwrap();
2213 let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2215 assert!(rows.result.records.is_empty());
2216 }
2217
2218 #[test]
2222 fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2223 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2224 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2225 .unwrap();
2226 rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2227 .unwrap();
2228
2229 rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2231
2232 rt.execute_query("DROP TABLE users").unwrap();
2233
2234 let events = queue_payloads(&rt, "users_events");
2236 assert_eq!(
2237 events.len(),
2238 1,
2239 "expected 1 collection_dropped event, got {}",
2240 events.len()
2241 );
2242 let ev = events[0].as_object().expect("event is object");
2243 assert_eq!(
2244 ev.get("op").and_then(crate::json::Value::as_str),
2245 Some("collection_dropped")
2246 );
2247 assert_eq!(
2248 ev.get("collection").and_then(crate::json::Value::as_str),
2249 Some("users")
2250 );
2251 assert_eq!(
2252 ev.get("final_entities_count")
2253 .and_then(crate::json::Value::as_u64),
2254 Some(2)
2255 );
2256 assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2257 assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2258 assert!(ev
2259 .get("event_id")
2260 .and_then(crate::json::Value::as_str)
2261 .is_some_and(|s| !s.is_empty()));
2262
2263 let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2265 assert!(
2266 format!("{err}").contains("users"),
2267 "expected not-found error"
2268 );
2269 }
2270
2271 #[test]
2274 fn drop_no_events_collection_emits_nothing() {
2275 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2276 rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2277 .unwrap();
2278 rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2279 .unwrap();
2280 rt.execute_query("DROP TABLE plain").unwrap();
2281 let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2283 assert!(format!("{err}").contains("plain"));
2284 }
2285
2286 #[test]
2290 fn ops_filter_insert_only_ignores_update_and_delete() {
2291 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2292 rt.execute_query(
2293 "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2294 )
2295 .unwrap();
2296 rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2297 .unwrap();
2298 rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2299 .unwrap();
2300 rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2301
2302 let events = queue_payloads(&rt, "items_events");
2303 assert_eq!(
2305 events.len(),
2306 1,
2307 "expected 1 insert event, got {}",
2308 events.len()
2309 );
2310 assert_eq!(
2311 events[0]
2312 .as_object()
2313 .unwrap()
2314 .get("op")
2315 .and_then(crate::json::Value::as_str),
2316 Some("insert")
2317 );
2318 }
2319
2320 #[test]
2322 fn where_filter_skips_rows_that_do_not_match() {
2323 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2324 rt.execute_query(
2325 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2326 )
2327 .unwrap();
2328
2329 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2331 .unwrap();
2332 rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2334 .unwrap();
2335
2336 let events = queue_payloads(&rt, "users_events");
2337 assert_eq!(
2338 events.len(),
2339 1,
2340 "expected 1 event (only active), got {}",
2341 events.len()
2342 );
2343 let ev = events[0].as_object().unwrap();
2344 assert_eq!(
2345 ev.get("op").and_then(crate::json::Value::as_str),
2346 Some("insert")
2347 );
2348 let after = ev.get("after").unwrap().as_object().unwrap();
2349 assert_eq!(
2350 after.get("status").and_then(crate::json::Value::as_str),
2351 Some("active")
2352 );
2353 }
2354
2355 #[test]
2357 fn ops_filter_and_where_filter_combined() {
2358 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2359 rt.execute_query(
2360 "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2361 )
2362 .unwrap();
2363
2364 rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2366 .unwrap();
2367 rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2369 .unwrap();
2370 rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2372 .unwrap();
2373 rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2375
2376 let events = queue_payloads(&rt, "items_events");
2377 assert_eq!(
2379 events.len(),
2380 1,
2381 "expected 1 event, got {}: {events:?}",
2382 events.len()
2383 );
2384 assert_eq!(
2385 events[0]
2386 .as_object()
2387 .unwrap()
2388 .get("op")
2389 .and_then(crate::json::Value::as_str),
2390 Some("insert")
2391 );
2392 }
2393
2394 #[test]
2396 fn where_filter_on_delete_checks_before_state() {
2397 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2398 rt.execute_query(
2399 "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2400 )
2401 .unwrap();
2402
2403 rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2404 .unwrap();
2405
2406 rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2408 rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2410
2411 let events = queue_payloads(&rt, "users_events");
2412 assert_eq!(
2413 events.len(),
2414 1,
2415 "expected 1 delete event, got {}",
2416 events.len()
2417 );
2418 let ev = events[0].as_object().unwrap();
2419 assert_eq!(
2420 ev.get("op").and_then(crate::json::Value::as_str),
2421 Some("delete")
2422 );
2423 }
2424
2425 #[test]
2429 fn alter_add_column_on_event_enabled_table_succeeds() {
2430 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2431 rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2432 .unwrap();
2433 rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2435 .unwrap();
2436 let contract = rt.db().collection_contract("users").unwrap();
2438 assert!(
2439 contract.declared_columns.iter().any(|c| c.name == "phone"),
2440 "phone column should be in contract"
2441 );
2442 assert!(
2444 contract.subscriptions.iter().any(|s| s.enabled),
2445 "subscription should remain enabled"
2446 );
2447 }
2448
2449 #[test]
2452 fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2453 let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2454 rt.execute_query(
2455 "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2456 )
2457 .unwrap();
2458 rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2460 .unwrap();
2461 let contract = rt.db().collection_contract("items").unwrap();
2462 assert!(
2463 !contract.declared_columns.iter().any(|c| c.name == "secret"),
2464 "secret column should be removed"
2465 );
2466 rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2468 .unwrap();
2469 assert!(
2471 contract.subscriptions.iter().any(|s| s.enabled),
2472 "subscription should remain enabled"
2473 );
2474 }
2475}