Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        // Check if the collection already exists.
37        let exists = store.get_collection(&query.name).is_some();
38        if exists {
39            if query.if_not_exists {
40                return Ok(RuntimeQueryResult::ok_message(
41                    raw_query.to_string(),
42                    &format!("table '{}' already exists", query.name),
43                    "create",
44                ));
45            }
46            return Err(RedDBError::Query(format!(
47                "table '{}' already exists",
48                query.name
49            )));
50        }
51
52        // Build and validate the contract before mutating storage so invalid
53        // SQL types / duplicate columns do not leave partial side effects.
54        let contract = collection_contract_from_create_table(query)?;
55        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
56        // Create the collection.
57        store
58            .create_collection(&query.name)
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        for subscription in &contract.subscriptions {
61            ensure_event_target_queue(self, &subscription.target_queue)?;
62        }
63        if let Some(default_ttl_ms) = query.default_ttl_ms {
64            self.inner
65                .db
66                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
67        }
68        self.inner
69            .db
70            .save_collection_contract(contract)
71            .map_err(|err| RedDBError::Internal(err.to_string()))?;
72        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
73            store.set_config_tree(
74                &format!("red.collection_tenants.{}", query.name),
75                &crate::serde_json::Value::String(tenant_id),
76            );
77        }
78        self.inner
79            .db
80            .persist_metadata()
81            .map_err(|err| RedDBError::Internal(err.to_string()))?;
82        self.refresh_table_planner_stats(&query.name);
83        self.invalidate_result_cache();
84        // Issue #120 — feed the create into the schema-vocabulary
85        // reverse index so AskPipeline (#121) sees this collection.
86        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
87        self.schema_vocabulary_apply(
88            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
89                collection: query.name.clone(),
90                columns,
91                type_tags: Vec::new(),
92                description: None,
93            },
94        );
95        // Partition metadata (Phase 2.2 PG parity).
96        //
97        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
98        // clause, stamp the partition config into `red_config` under
99        // `partition.{table}.{by,column}`. Children are registered separately
100        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
101        if let Some(spec) = &query.partition_by {
102            let kind_str = match spec.kind {
103                crate::storage::query::ast::PartitionKind::Range => "range",
104                crate::storage::query::ast::PartitionKind::List => "list",
105                crate::storage::query::ast::PartitionKind::Hash => "hash",
106            };
107            store.set_config_tree(
108                &format!("partition.{}.by", query.name),
109                &crate::serde_json::Value::String(kind_str.to_string()),
110            );
111            store.set_config_tree(
112                &format!("partition.{}.column", query.name),
113                &crate::serde_json::Value::String(spec.column.clone()),
114            );
115        }
116
117        // Table-scoped multi-tenancy (Phase 2.5.4).
118        //
119        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
120        //   1. Persists the `tenant_tables.{table}.column` marker so
121        //      INSERTs can auto-fill and future opens re-hydrate.
122        //   2. Registers the table in the in-memory `tenant_tables`
123        //      HashMap used by the DML auto-fill path.
124        //   3. Installs an implicit RLS policy equivalent to
125        //      `USING (col = CURRENT_TENANT())` across all actions.
126        //   4. Flips `rls_enabled_tables` on so the policy applies.
127        if let Some(col) = &query.tenant_by {
128            store.set_config_tree(
129                &format!("tenant_tables.{}.column", query.name),
130                &crate::serde_json::Value::String(col.clone()),
131            );
132            self.register_tenant_table(&query.name, col);
133        }
134
135        let ttl_suffix = query
136            .default_ttl_ms
137            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
138            .unwrap_or_default();
139
140        let tenant_suffix = query
141            .tenant_by
142            .as_ref()
143            .map(|col| format!(" (tenant-scoped by {col})"))
144            .unwrap_or_default();
145
146        Ok(RuntimeQueryResult::ok_message(
147            raw_query.to_string(),
148            &format!(
149                "table '{}' created{}{}",
150                query.name, ttl_suffix, tenant_suffix
151            ),
152            "create",
153        ))
154    }
155
156    fn execute_create_keyed_collection(
157        &self,
158        raw_query: &str,
159        query: &CreateTableQuery,
160    ) -> RedDBResult<RuntimeQueryResult> {
161        if query.collection_model == CollectionModel::Document {
162            return Err(RedDBError::Query(
163                "NOT_YET_SUPPORTED: CREATE DOCUMENT is not implemented yet; use an auto-created table by inserting JSON rows into a normal table as a workaround".to_string(),
164            ));
165        }
166        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
167        if is_system_schema_name(&query.name) {
168            return Err(RedDBError::Query("system schema is read-only".to_string()));
169        }
170        let store = self.inner.db.store();
171        let label = polymorphic_resolver::model_name(query.collection_model);
172        if store.get_collection(&query.name).is_some() {
173            if query.if_not_exists {
174                return Ok(RuntimeQueryResult::ok_message(
175                    raw_query.to_string(),
176                    &format!("{label} '{}' already exists", query.name),
177                    "create",
178                ));
179            }
180            return Err(RedDBError::Query(format!(
181                "{label} '{}' already exists",
182                query.name
183            )));
184        }
185
186        store
187            .create_collection(&query.name)
188            .map_err(|err| RedDBError::Internal(err.to_string()))?;
189        if query.collection_model == CollectionModel::Vault {
190            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
191            let key_scope = if query.vault_own_master_key {
192                "own"
193            } else {
194                "cluster"
195            };
196            store.set_config_tree(
197                &format!("red.vault.{}.key_scope", query.name),
198                &crate::serde_json::Value::String(key_scope.to_string()),
199            );
200            store.set_config_tree(
201                &format!("red.vault.{}.status", query.name),
202                &crate::serde_json::Value::String("sealed".to_string()),
203            );
204        }
205        self.inner
206            .db
207            .save_collection_contract(keyed_collection_contract(
208                &query.name,
209                query.collection_model,
210            ))
211            .map_err(|err| RedDBError::Internal(err.to_string()))?;
212        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
213            store.set_config_tree(
214                &format!("red.collection_tenants.{}", query.name),
215                &crate::serde_json::Value::String(tenant_id),
216            );
217        }
218        self.inner
219            .db
220            .persist_metadata()
221            .map_err(|err| RedDBError::Internal(err.to_string()))?;
222        self.invalidate_result_cache();
223
224        Ok(RuntimeQueryResult::ok_message(
225            raw_query.to_string(),
226            &format!("{label} '{}' created", query.name),
227            "create",
228        ))
229    }
230
231    pub fn execute_create_collection(
232        &self,
233        raw_query: &str,
234        query: &CreateCollectionQuery,
235    ) -> RedDBResult<RuntimeQueryResult> {
236        let model = match query.kind.as_str() {
237            "graph" => CollectionModel::Graph,
238            "document" => CollectionModel::Document,
239            other => {
240                return Err(RedDBError::Query(format!(
241                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
242                )));
243            }
244        };
245        let create = CreateTableQuery {
246            collection_model: model,
247            name: query.name.clone(),
248            columns: Vec::new(),
249            if_not_exists: query.if_not_exists,
250            default_ttl_ms: None,
251            context_index_fields: Vec::new(),
252            context_index_enabled: false,
253            timestamps: false,
254            partition_by: None,
255            tenant_by: None,
256            append_only: false,
257            subscriptions: Vec::new(),
258            vault_own_master_key: false,
259        };
260        self.execute_create_table(raw_query, &create)
261    }
262
263    pub fn execute_create_vector(
264        &self,
265        raw_query: &str,
266        query: &CreateVectorQuery,
267    ) -> RedDBResult<RuntimeQueryResult> {
268        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
269        if is_system_schema_name(&query.name) {
270            return Err(RedDBError::Query("system schema is read-only".to_string()));
271        }
272        let store = self.inner.db.store();
273        if store.get_collection(&query.name).is_some() {
274            if query.if_not_exists {
275                return Ok(RuntimeQueryResult::ok_message(
276                    raw_query.to_string(),
277                    &format!("vector '{}' already exists", query.name),
278                    "create",
279                ));
280            }
281            return Err(RedDBError::Query(format!(
282                "vector '{}' already exists",
283                query.name
284            )));
285        }
286
287        store
288            .create_collection(&query.name)
289            .map_err(|err| RedDBError::Internal(err.to_string()))?;
290        self.inner
291            .db
292            .save_collection_contract(vector_collection_contract(query))
293            .map_err(|err| RedDBError::Internal(err.to_string()))?;
294        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
295            store.set_config_tree(
296                &format!("red.collection_tenants.{}", query.name),
297                &crate::serde_json::Value::String(tenant_id),
298            );
299        }
300        self.inner
301            .db
302            .persist_metadata()
303            .map_err(|err| RedDBError::Internal(err.to_string()))?;
304        self.invalidate_result_cache();
305
306        Ok(RuntimeQueryResult::ok_message(
307            raw_query.to_string(),
308            &format!("vector '{}' created", query.name),
309            "create",
310        ))
311    }
312
313    fn provision_vault_key_material(
314        &self,
315        collection: &str,
316        own_master_key: bool,
317    ) -> RedDBResult<()> {
318        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
319            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
320        })?;
321        if !auth_store.is_vault_backed() {
322            return Err(RedDBError::Query(
323                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
324            ));
325        }
326
327        if auth_store.vault_secret_key().is_none() {
328            let key = crate::auth::store::random_bytes(32);
329            auth_store
330                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
331                .map_err(|err| RedDBError::Query(err.to_string()))?;
332        }
333
334        if own_master_key {
335            let key = crate::auth::store::random_bytes(32);
336            auth_store
337                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
338                .map_err(|err| RedDBError::Query(err.to_string()))?;
339        }
340
341        Ok(())
342    }
343
344    /// Execute DROP TABLE
345    ///
346    /// Drops the collection and all its data from the store.
347    pub fn execute_drop_table(
348        &self,
349        raw_query: &str,
350        query: &DropTableQuery,
351    ) -> RedDBResult<RuntimeQueryResult> {
352        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
353        let store = self.inner.db.store();
354
355        if is_system_schema_name(&query.name) {
356            return Err(RedDBError::Query("system schema is read-only".to_string()));
357        }
358
359        let exists = store.get_collection(&query.name).is_some();
360        if !exists {
361            if query.if_exists {
362                return Ok(RuntimeQueryResult::ok_message(
363                    raw_query.to_string(),
364                    &format!("table '{}' does not exist", query.name),
365                    "drop",
366                ));
367            }
368            return Err(RedDBError::NotFound(format!(
369                "table '{}' not found",
370                query.name
371            )));
372        }
373        let actual =
374            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
375        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
376
377        // Emit 1 collection_dropped event before storage is wiped.
378        // Queue is preserved; subscription is removed with the contract below.
379        let final_count = store
380            .get_collection(&query.name)
381            .map(|manager| manager.query_all(|_| true).len() as u64)
382            .unwrap_or(0);
383        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
384            self,
385            &query.name,
386            final_count,
387        )?;
388
389        let orphaned_indices: Vec<String> = self
390            .inner
391            .index_store
392            .list_indices(&query.name)
393            .into_iter()
394            .map(|index| index.name)
395            .collect();
396        for name in &orphaned_indices {
397            self.inner.index_store.drop_index(name, &query.name);
398        }
399
400        store
401            .drop_collection(&query.name)
402            .map_err(|err| RedDBError::Internal(err.to_string()))?;
403        self.inner.db.invalidate_vector_index(&query.name);
404        self.inner.db.clear_collection_default_ttl_ms(&query.name);
405        self.inner
406            .db
407            .remove_collection_contract(&query.name)
408            .map_err(|err| RedDBError::Internal(err.to_string()))?;
409        self.clear_table_planner_stats(&query.name);
410        self.invalidate_result_cache();
411        // Issue #119: a dropped collection vanishes from every
412        // (tenant, role)'s visible-collections set. Auth is optional in
413        // embedded mode so guard the call.
414        if let Some(store) = self.inner.auth_store.read().clone() {
415            store.invalidate_visible_collections_cache();
416        }
417        self.inner
418            .db
419            .persist_metadata()
420            .map_err(|err| RedDBError::Internal(err.to_string()))?;
421        // Issue #120 — drop both the collection entries *and* every
422        // index entry that was scoped to this collection.  Dropping the
423        // collection wipes columns + collection-name + type-tags +
424        // index hits in one pass via `purge_collection_entries`, so
425        // the explicit `DropIndex` calls would be redundant.
426        self.schema_vocabulary_apply(
427            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
428                collection: query.name.clone(),
429            },
430        );
431
432        Ok(RuntimeQueryResult::ok_message(
433            raw_query.to_string(),
434            &format!("table '{}' dropped", query.name),
435            "drop",
436        ))
437    }
438
439    pub fn execute_drop_graph(
440        &self,
441        raw_query: &str,
442        query: &DropGraphQuery,
443    ) -> RedDBResult<RuntimeQueryResult> {
444        self.execute_drop_typed_collection(
445            raw_query,
446            &query.name,
447            query.if_exists,
448            CollectionModel::Graph,
449            "graph",
450        )
451    }
452
453    pub fn execute_drop_vector(
454        &self,
455        raw_query: &str,
456        query: &DropVectorQuery,
457    ) -> RedDBResult<RuntimeQueryResult> {
458        self.execute_drop_typed_collection(
459            raw_query,
460            &query.name,
461            query.if_exists,
462            CollectionModel::Vector,
463            "vector",
464        )
465    }
466
467    pub fn execute_drop_document(
468        &self,
469        raw_query: &str,
470        query: &DropDocumentQuery,
471    ) -> RedDBResult<RuntimeQueryResult> {
472        self.execute_drop_typed_collection(
473            raw_query,
474            &query.name,
475            query.if_exists,
476            CollectionModel::Document,
477            "document",
478        )
479    }
480
481    pub fn execute_drop_kv(
482        &self,
483        raw_query: &str,
484        query: &DropKvQuery,
485    ) -> RedDBResult<RuntimeQueryResult> {
486        let label = polymorphic_resolver::model_name(query.model);
487        self.execute_drop_typed_collection(
488            raw_query,
489            &query.name,
490            query.if_exists,
491            query.model,
492            label,
493        )
494    }
495
496    pub fn execute_drop_collection(
497        &self,
498        raw_query: &str,
499        query: &DropCollectionQuery,
500    ) -> RedDBResult<RuntimeQueryResult> {
501        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
502        if is_system_schema_name(&query.name) {
503            return Err(RedDBError::Query("system schema is read-only".to_string()));
504        }
505        let store = self.inner.db.store();
506        if store.get_collection(&query.name).is_none() {
507            if query.if_exists {
508                return Ok(RuntimeQueryResult::ok_message(
509                    raw_query.to_string(),
510                    &format!("collection '{}' does not exist", query.name),
511                    "drop",
512                ));
513            }
514            return Err(RedDBError::NotFound(format!(
515                "collection '{}' not found",
516                query.name
517            )));
518        }
519
520        match polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())? {
521            CollectionModel::Table => self.execute_drop_table(
522                raw_query,
523                &DropTableQuery {
524                    name: query.name.clone(),
525                    if_exists: query.if_exists,
526                },
527            ),
528            CollectionModel::TimeSeries => self.execute_drop_timeseries(
529                raw_query,
530                &DropTimeSeriesQuery {
531                    name: query.name.clone(),
532                    if_exists: query.if_exists,
533                },
534            ),
535            CollectionModel::Queue => self.execute_drop_queue(
536                raw_query,
537                &DropQueueQuery {
538                    name: query.name.clone(),
539                    if_exists: query.if_exists,
540                },
541            ),
542            CollectionModel::Graph => self.execute_drop_graph(
543                raw_query,
544                &DropGraphQuery {
545                    name: query.name.clone(),
546                    if_exists: query.if_exists,
547                },
548            ),
549            CollectionModel::Vector => self.execute_drop_vector(
550                raw_query,
551                &DropVectorQuery {
552                    name: query.name.clone(),
553                    if_exists: query.if_exists,
554                },
555            ),
556            CollectionModel::Document => self.execute_drop_document(
557                raw_query,
558                &DropDocumentQuery {
559                    name: query.name.clone(),
560                    if_exists: query.if_exists,
561                },
562            ),
563            CollectionModel::Kv => self.execute_drop_kv(
564                raw_query,
565                &DropKvQuery {
566                    name: query.name.clone(),
567                    if_exists: query.if_exists,
568                    model: CollectionModel::Kv,
569                },
570            ),
571            CollectionModel::Config => self.execute_drop_kv(
572                raw_query,
573                &DropKvQuery {
574                    name: query.name.clone(),
575                    if_exists: query.if_exists,
576                    model: CollectionModel::Config,
577                },
578            ),
579            CollectionModel::Vault => self.execute_drop_kv(
580                raw_query,
581                &DropKvQuery {
582                    name: query.name.clone(),
583                    if_exists: query.if_exists,
584                    model: CollectionModel::Vault,
585                },
586            ),
587            CollectionModel::Hll => self.execute_probabilistic_command(
588                raw_query,
589                &ProbabilisticCommand::DropHll {
590                    name: query.name.clone(),
591                    if_exists: query.if_exists,
592                },
593            ),
594            CollectionModel::Sketch => self.execute_probabilistic_command(
595                raw_query,
596                &ProbabilisticCommand::DropSketch {
597                    name: query.name.clone(),
598                    if_exists: query.if_exists,
599                },
600            ),
601            CollectionModel::Filter => self.execute_probabilistic_command(
602                raw_query,
603                &ProbabilisticCommand::DropFilter {
604                    name: query.name.clone(),
605                    if_exists: query.if_exists,
606                },
607            ),
608            CollectionModel::Mixed => self.execute_drop_typed_collection(
609                raw_query,
610                &query.name,
611                query.if_exists,
612                CollectionModel::Mixed,
613                "collection",
614            ),
615        }
616    }
617
618    /// Execute ALTER TABLE
619    ///
620    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
621    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
622    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
623    pub fn execute_alter_table(
624        &self,
625        raw_query: &str,
626        query: &AlterTableQuery,
627    ) -> RedDBResult<RuntimeQueryResult> {
628        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
629        let store = self.inner.db.store();
630
631        // Verify the table exists.
632        if store.get_collection(&query.name).is_none() {
633            return Err(RedDBError::NotFound(format!(
634                "table '{}' not found",
635                query.name
636            )));
637        }
638
639        let mut messages = Vec::new();
640
641        // Collect column-level changes upfront for schema-change event emission below.
642        let fields_added: Vec<String> = query
643            .operations
644            .iter()
645            .filter_map(|op| {
646                if let AlterOperation::AddColumn(col) = op {
647                    Some(col.name.clone())
648                } else {
649                    None
650                }
651            })
652            .collect();
653        let fields_removed: Vec<String> = query
654            .operations
655            .iter()
656            .filter_map(|op| {
657                if let AlterOperation::DropColumn(name) = op {
658                    Some(name.clone())
659                } else {
660                    None
661                }
662            })
663            .collect();
664
665        for op in &query.operations {
666            match op {
667                AlterOperation::AddColumn(col) => {
668                    // Schema-on-read: column will be available on next insert.
669                    messages.push(format!("column '{}' added", col.name));
670                }
671                AlterOperation::DropColumn(name) => {
672                    messages.push(format!("column '{}' dropped", name));
673                }
674                AlterOperation::RenameColumn { from, to } => {
675                    messages.push(format!("column '{}' renamed to '{}'", from, to));
676                }
677                AlterOperation::AttachPartition { child, bound } => {
678                    // Persist child → parent binding in red_config so the
679                    // future planner-side pruner can enumerate children and
680                    // evaluate their bounds.
681                    store.set_config_tree(
682                        &format!("partition.{}.children.{}", query.name, child),
683                        &crate::serde_json::Value::String(bound.clone()),
684                    );
685                    messages.push(format!(
686                        "partition '{child}' attached to '{}' ({bound})",
687                        query.name
688                    ));
689                }
690                AlterOperation::DetachPartition { child } => {
691                    store.set_config_tree(
692                        &format!("partition.{}.children.{}", query.name, child),
693                        &crate::serde_json::Value::Null,
694                    );
695                    messages.push(format!(
696                        "partition '{child}' detached from '{}'",
697                        query.name
698                    ));
699                }
700                AlterOperation::EnableRowLevelSecurity => {
701                    self.inner
702                        .rls_enabled_tables
703                        .write()
704                        .insert(query.name.clone());
705                    // Persist flag so RLS survives restart via red_config.
706                    store.set_config_tree(
707                        &format!("rls.enabled.{}", query.name),
708                        &crate::serde_json::Value::Bool(true),
709                    );
710                    self.invalidate_plan_cache();
711                    messages.push(format!("row level security enabled on '{}'", query.name));
712                }
713                AlterOperation::DisableRowLevelSecurity => {
714                    self.inner.rls_enabled_tables.write().remove(&query.name);
715                    store.set_config_tree(
716                        &format!("rls.enabled.{}", query.name),
717                        &crate::serde_json::Value::Null,
718                    );
719                    self.invalidate_plan_cache();
720                    messages.push(format!("row level security disabled on '{}'", query.name));
721                }
722                // Phase 2.5.4: retrofit tenancy onto an existing table.
723                AlterOperation::EnableTenancy { column } => {
724                    store.set_config_tree(
725                        &format!("tenant_tables.{}.column", query.name),
726                        &crate::serde_json::Value::String(column.clone()),
727                    );
728                    self.register_tenant_table(&query.name, column);
729                    self.invalidate_plan_cache();
730                    messages.push(format!(
731                        "tenancy enabled on '{}' by column '{column}'",
732                        query.name
733                    ));
734                }
735                AlterOperation::DisableTenancy => {
736                    store.set_config_tree(
737                        &format!("tenant_tables.{}.column", query.name),
738                        &crate::serde_json::Value::Null,
739                    );
740                    self.unregister_tenant_table(&query.name);
741                    self.invalidate_plan_cache();
742                    messages.push(format!("tenancy disabled on '{}'", query.name));
743                }
744                AlterOperation::SetAppendOnly(on) => {
745                    // Contract is the single source of truth for the
746                    // UPDATE/DELETE parse-time guard. The flag lands
747                    // below via `apply_alter_operations_to_contract`;
748                    // here we only publish the human-readable message.
749                    messages.push(format!(
750                        "append_only {} on '{}'",
751                        if *on { "enabled" } else { "disabled" },
752                        query.name
753                    ));
754                }
755                AlterOperation::SetVersioned(on) => {
756                    // Opt the collection into (or out of) Git-for-Data.
757                    // Persists a row in red_vcs_settings; next AS OF /
758                    // merge / diff against this table honours the
759                    // flag. Retroactive: existing row versions whose
760                    // xmins are still pinned by commits become
761                    // reachable via AS OF COMMIT immediately.
762                    self.vcs_set_versioned(&query.name, *on)?;
763                    messages.push(format!(
764                        "versioned {} on '{}'",
765                        if *on { "enabled" } else { "disabled" },
766                        query.name
767                    ));
768                }
769                AlterOperation::EnableEvents(subscription) => {
770                    let mut subscription = subscription.clone();
771                    subscription.source = query.name.clone();
772                    validate_event_subscriptions(
773                        self,
774                        &query.name,
775                        std::slice::from_ref(&subscription),
776                    )?;
777                    ensure_event_target_queue(self, &subscription.target_queue)?;
778                    messages.push(format!(
779                        "events enabled on '{}' to '{}'",
780                        query.name, subscription.target_queue
781                    ));
782                }
783                AlterOperation::DisableEvents => {
784                    messages.push(format!("events disabled on '{}'", query.name));
785                }
786                AlterOperation::AddSubscription { name, descriptor } => {
787                    let mut sub = descriptor.clone();
788                    sub.name = name.clone();
789                    sub.source = query.name.clone();
790                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
791                    ensure_event_target_queue(self, &sub.target_queue)?;
792                    messages.push(format!(
793                        "subscription '{}' added on '{}' to '{}'",
794                        name, query.name, sub.target_queue
795                    ));
796                }
797                AlterOperation::DropSubscription { name } => {
798                    messages.push(format!(
799                        "subscription '{}' dropped on '{}'",
800                        name, query.name
801                    ));
802                }
803            }
804        }
805
806        let mut contract = self
807            .inner
808            .db
809            .collection_contract(&query.name)
810            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
811        apply_alter_operations_to_contract(&mut contract, &query.operations);
812        contract.version = contract.version.saturating_add(1);
813        contract.updated_at_unix_ms = current_unix_ms();
814        self.inner
815            .db
816            .save_collection_contract(contract)
817            .map_err(|err| RedDBError::Internal(err.to_string()))?;
818        // Issue #301 — emit OperatorEvent when column schema changes on a
819        // collection that has active event subscriptions, so operators know
820        // downstream consumers may see a different payload shape.
821        if !fields_added.is_empty() || !fields_removed.is_empty() {
822            let sub_names: Vec<String> = self
823                .inner
824                .db
825                .collection_contract(&query.name)
826                .map(|c| {
827                    c.subscriptions
828                        .iter()
829                        .filter(|s| s.enabled)
830                        .map(|s| s.name.clone())
831                        .collect()
832                })
833                .unwrap_or_default();
834            if !sub_names.is_empty() {
835                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
836                    collection: query.name.clone(),
837                    subscription_names: sub_names.join(", "),
838                    fields_added: fields_added.join(", "),
839                    fields_removed: fields_removed.join(", "),
840                    lsn: self.cdc_current_lsn(),
841                }
842                .emit_global();
843            }
844        }
845
846        self.clear_table_planner_stats(&query.name);
847        self.invalidate_result_cache();
848        // Issue #120 — refresh the schema-vocabulary entries from the
849        // post-ALTER contract. Drop+recreate inside the index keeps
850        // the invalidation guarantee complete (no stale columns from
851        // before an ALTER ... DROP COLUMN).
852        let post_alter_columns: Vec<String> = self
853            .inner
854            .db
855            .collection_contract(&query.name)
856            .map(|contract| {
857                contract
858                    .declared_columns
859                    .iter()
860                    .map(|col| col.name.clone())
861                    .collect()
862            })
863            .unwrap_or_default();
864        self.schema_vocabulary_apply(
865            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
866                collection: query.name.clone(),
867                columns: post_alter_columns,
868                type_tags: Vec::new(),
869                description: None,
870            },
871        );
872
873        let message = if messages.is_empty() {
874            format!("table '{}' altered (no operations)", query.name)
875        } else {
876            format!("table '{}' altered: {}", query.name, messages.join(", "))
877        };
878
879        Ok(RuntimeQueryResult::ok_message(
880            raw_query.to_string(),
881            &message,
882            "alter",
883        ))
884    }
885
886    /// Execute EXPLAIN ALTER FOR CREATE TABLE
887    ///
888    /// Pure read: computes the schema diff between the target table's
889    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
890    /// and returns it as SQL `ALTER TABLE` text (default) or structured
891    /// JSON. Never mutates storage.
892    pub fn execute_explain_alter(
893        &self,
894        raw_query: &str,
895        query: &ExplainAlterQuery,
896    ) -> RedDBResult<RuntimeQueryResult> {
897        // Validate the target CREATE TABLE body so syntactically valid
898        // but semantically broken targets (bad SQL types, duplicate
899        // columns) are caught here rather than inside the diff engine.
900        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
901
902        let current_contract = self.inner.db.collection_contract(&query.target.name);
903
904        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
905            .as_ref()
906            .map(|c| c.declared_columns.clone())
907            .unwrap_or_default();
908
909        let diff = super::schema_diff::compute_column_diff(
910            &query.target.name,
911            &current_columns,
912            &query.target.columns,
913        );
914
915        let rendered = match query.format {
916            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
917            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
918        };
919
920        let format_label = match query.format {
921            ExplainFormat::Sql => "sql",
922            ExplainFormat::Json => "json",
923        };
924
925        let columns = vec![
926            "table".to_string(),
927            "format".to_string(),
928            "diff".to_string(),
929        ];
930        let row = vec![
931            ("table".to_string(), Value::text(query.target.name.clone())),
932            ("format".to_string(), Value::text(format_label.to_string())),
933            ("diff".to_string(), Value::text(rendered)),
934        ];
935
936        Ok(RuntimeQueryResult::ok_records(
937            raw_query.to_string(),
938            columns,
939            vec![row],
940            "explain",
941        ))
942    }
943
944    /// Execute CREATE INDEX
945    ///
946    /// Registers a new index on a collection, builds it from existing data,
947    /// and makes it available to the query executor for O(1) lookups.
948    pub fn execute_create_index(
949        &self,
950        raw_query: &str,
951        query: &CreateIndexQuery,
952    ) -> RedDBResult<RuntimeQueryResult> {
953        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
954        let store = self.inner.db.store();
955
956        // Verify the table exists
957        let manager = store
958            .get_collection(&query.table)
959            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
960
961        let method_kind = match query.method {
962            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
963            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
964            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
965            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
966        };
967
968        // Extract fields from existing entities for indexing. Row
969        // entities may arrive in either the "named" HashMap layout
970        // (gRPC `BulkInsertBinary` path) OR the columnar layout
971        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
972        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
973        // leaves `named == None`). Prior to this commit the columnar
974        // branch returned an empty field list, so `CREATE INDEX` built
975        // a zero-entity index over HTTP-inserted data even though the
976        // data was queryable via `SELECT`.
977        let entities = manager.query_all(|_| true);
978        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
979            entities
980                .iter()
981                .map(|e| {
982                    let fields = match &e.data {
983                        crate::storage::EntityData::Row(row) => {
984                            if let Some(ref named) = row.named {
985                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
986                            } else if let Some(ref schema) = row.schema {
987                                // Columnar layout — pair each column
988                                // with its positional name from the
989                                // shared schema Arc.
990                                schema
991                                    .iter()
992                                    .zip(row.columns.iter())
993                                    .map(|(k, v)| (k.clone(), v.clone()))
994                                    .collect()
995                            } else {
996                                Vec::new()
997                            }
998                        }
999                        crate::storage::EntityData::Node(node) => node
1000                            .properties
1001                            .iter()
1002                            .map(|(k, v)| (k.clone(), v.clone()))
1003                            .collect(),
1004                        _ => Vec::new(),
1005                    };
1006                    (e.id, fields)
1007                })
1008                .collect();
1009
1010        // Build the index
1011        let indexed_count = self
1012            .inner
1013            .index_store
1014            .create_index(
1015                &query.name,
1016                &query.table,
1017                &query.columns,
1018                method_kind,
1019                query.unique,
1020                &entity_fields,
1021            )
1022            .map_err(RedDBError::Internal)?;
1023
1024        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1025            &query.table,
1026            &entity_fields,
1027        );
1028        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1029        self.invalidate_plan_cache();
1030
1031        // Register metadata
1032        self.inner
1033            .index_store
1034            .register(super::index_store::RegisteredIndex {
1035                name: query.name.clone(),
1036                collection: query.table.clone(),
1037                columns: query.columns.clone(),
1038                method: method_kind,
1039                unique: query.unique,
1040            });
1041        // Issue #120 — surface the index name + indexed columns in
1042        // the schema-vocabulary so AskPipeline (#121) can resolve
1043        // "the email index" back to its collection.
1044        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1045            collection: query.table.clone(),
1046            index: query.name.clone(),
1047            columns: query.columns.clone(),
1048        });
1049
1050        let method_str = format!("{}", query.method);
1051        let unique_str = if query.unique { "unique " } else { "" };
1052        let cols = query.columns.join(", ");
1053
1054        Ok(RuntimeQueryResult::ok_message(
1055            raw_query.to_string(),
1056            &format!(
1057                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1058                unique_str, query.name, query.table, cols, method_str, indexed_count
1059            ),
1060            "create",
1061        ))
1062    }
1063
1064    /// Execute DROP INDEX
1065    ///
1066    /// Removes an index from a collection.
1067    pub fn execute_drop_index(
1068        &self,
1069        raw_query: &str,
1070        query: &DropIndexQuery,
1071    ) -> RedDBResult<RuntimeQueryResult> {
1072        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1073        let store = self.inner.db.store();
1074
1075        // Verify the table exists
1076        if store.get_collection(&query.table).is_none() {
1077            if query.if_exists {
1078                return Ok(RuntimeQueryResult::ok_message(
1079                    raw_query.to_string(),
1080                    &format!("table '{}' does not exist", query.table),
1081                    "drop",
1082                ));
1083            }
1084            return Err(RedDBError::NotFound(format!(
1085                "table '{}' not found",
1086                query.table
1087            )));
1088        }
1089
1090        // Remove from IndexStore
1091        self.inner.index_store.drop_index(&query.name, &query.table);
1092        self.invalidate_plan_cache();
1093        // Issue #120 — keep the schema-vocabulary index entry in sync.
1094        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1095            collection: query.table.clone(),
1096            index: query.name.clone(),
1097        });
1098
1099        Ok(RuntimeQueryResult::ok_message(
1100            raw_query.to_string(),
1101            &format!("index '{}' dropped from '{}'", query.name, query.table),
1102            "drop",
1103        ))
1104    }
1105
1106    fn execute_drop_typed_collection(
1107        &self,
1108        raw_query: &str,
1109        name: &str,
1110        if_exists: bool,
1111        expected_model: CollectionModel,
1112        label: &str,
1113    ) -> RedDBResult<RuntimeQueryResult> {
1114        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1115        if is_system_schema_name(name) {
1116            return Err(RedDBError::Query("system schema is read-only".to_string()));
1117        }
1118        let store = self.inner.db.store();
1119        if store.get_collection(name).is_none() {
1120            if if_exists {
1121                return Ok(RuntimeQueryResult::ok_message(
1122                    raw_query.to_string(),
1123                    &format!("{label} '{name}' does not exist"),
1124                    "drop",
1125                ));
1126            }
1127            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1128        }
1129
1130        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1131        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1132        self.drop_collection_storage(raw_query, name, label)
1133    }
1134
1135    pub fn execute_truncate(
1136        &self,
1137        raw_query: &str,
1138        query: &TruncateQuery,
1139    ) -> RedDBResult<RuntimeQueryResult> {
1140        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1141        if is_system_schema_name(&query.name) {
1142            return Err(RedDBError::Query("system schema is read-only".to_string()));
1143        }
1144
1145        let label = query
1146            .model
1147            .map(polymorphic_resolver::model_name)
1148            .unwrap_or("collection");
1149        let store = self.inner.db.store();
1150        if store.get_collection(&query.name).is_none() {
1151            if query.if_exists {
1152                return Ok(RuntimeQueryResult::ok_message(
1153                    raw_query.to_string(),
1154                    &format!("{label} '{}' does not exist", query.name),
1155                    "truncate",
1156                ));
1157            }
1158            return Err(RedDBError::NotFound(format!(
1159                "{label} '{}' not found",
1160                query.name
1161            )));
1162        }
1163
1164        let actual =
1165            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1166        if let Some(expected) = query.model {
1167            polymorphic_resolver::ensure_model_match(expected, actual)?;
1168        }
1169
1170        if actual == CollectionModel::Queue {
1171            return self.execute_queue_command(
1172                raw_query,
1173                &QueueCommand::Purge {
1174                    queue: query.name.clone(),
1175                },
1176            );
1177        }
1178
1179        // Count before wiping so we can emit the aggregated truncate event.
1180        let affected = self.truncate_collection_entities(&query.name)?;
1181        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1182        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1183        self.inner.db.invalidate_vector_index(&query.name);
1184        self.clear_table_planner_stats(&query.name);
1185        self.invalidate_result_cache();
1186
1187        Ok(RuntimeQueryResult::ok_message(
1188            raw_query.to_string(),
1189            &format!(
1190                "{affected} entities truncated from {label} '{}'",
1191                query.name
1192            ),
1193            "truncate",
1194        ))
1195    }
1196
1197    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1198        let store = self.inner.db.store();
1199        let Some(manager) = store.get_collection(name) else {
1200            return Ok(0);
1201        };
1202        let entities = manager.query_all(|_| true);
1203        if entities.is_empty() {
1204            return Ok(0);
1205        }
1206
1207        for entity in &entities {
1208            let fields = entity_index_fields(&entity.data);
1209            self.inner
1210                .index_store
1211                .index_entity_delete(name, entity.id, &fields)
1212                .map_err(RedDBError::Internal)?;
1213        }
1214
1215        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1216        let deleted_ids = store
1217            .delete_batch(name, &ids)
1218            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1219        for id in &deleted_ids {
1220            store.context_index().remove_entity(*id);
1221        }
1222        Ok(deleted_ids.len() as u64)
1223    }
1224
1225    fn drop_collection_storage(
1226        &self,
1227        raw_query: &str,
1228        name: &str,
1229        label: &str,
1230    ) -> RedDBResult<RuntimeQueryResult> {
1231        let store = self.inner.db.store();
1232
1233        // Emit 1 collection_dropped event before storage is wiped.
1234        // Queue is preserved; subscription is removed with the contract below.
1235        let final_count = store
1236            .get_collection(name)
1237            .map(|manager| manager.query_all(|_| true).len() as u64)
1238            .unwrap_or(0);
1239        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1240            self,
1241            name,
1242            final_count,
1243        )?;
1244
1245        let orphaned_indices: Vec<String> = self
1246            .inner
1247            .index_store
1248            .list_indices(name)
1249            .into_iter()
1250            .map(|index| index.name)
1251            .collect();
1252        for index_name in &orphaned_indices {
1253            self.inner.index_store.drop_index(index_name, name);
1254        }
1255
1256        store
1257            .drop_collection(name)
1258            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1259        self.inner.db.invalidate_vector_index(name);
1260        self.inner.db.clear_collection_default_ttl_ms(name);
1261        self.inner
1262            .db
1263            .remove_collection_contract(name)
1264            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1265        self.clear_table_planner_stats(name);
1266        self.invalidate_result_cache();
1267        if let Some(store) = self.inner.auth_store.read().clone() {
1268            store.invalidate_visible_collections_cache();
1269        }
1270        self.inner
1271            .db
1272            .persist_metadata()
1273            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1274        self.schema_vocabulary_apply(
1275            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1276                collection: name.to_string(),
1277            },
1278        );
1279
1280        Ok(RuntimeQueryResult::ok_message(
1281            raw_query.to_string(),
1282            &format!("{label} '{name}' dropped"),
1283            "drop",
1284        ))
1285    }
1286}
1287
1288pub(crate) fn is_system_schema_name(name: &str) -> bool {
1289    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1290}
1291
1292fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1293    match data {
1294        EntityData::Row(row) => {
1295            if let Some(ref named) = row.named {
1296                named
1297                    .iter()
1298                    .map(|(key, value)| (key.clone(), value.clone()))
1299                    .collect()
1300            } else if let Some(ref schema) = row.schema {
1301                schema
1302                    .iter()
1303                    .zip(row.columns.iter())
1304                    .map(|(key, value)| (key.clone(), value.clone()))
1305                    .collect()
1306            } else {
1307                Vec::new()
1308            }
1309        }
1310        EntityData::Node(node) => node
1311            .properties
1312            .iter()
1313            .map(|(key, value)| (key.clone(), value.clone()))
1314            .collect(),
1315        _ => Vec::new(),
1316    }
1317}
1318
1319fn collection_contract_from_create_table(
1320    query: &CreateTableQuery,
1321) -> RedDBResult<crate::physical::CollectionContract> {
1322    let now = current_unix_ms();
1323    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1324        .columns
1325        .iter()
1326        .map(declared_column_contract_from_ddl)
1327        .collect();
1328    if query.timestamps {
1329        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1330        // columns that the write path populates from
1331        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1332        declared_columns.push(crate::physical::DeclaredColumnContract {
1333            name: "created_at".to_string(),
1334            data_type: "BIGINT".to_string(),
1335            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1336            not_null: true,
1337            default: None,
1338            compress: None,
1339            unique: false,
1340            primary_key: false,
1341            enum_variants: Vec::new(),
1342            array_element: None,
1343            decimal_precision: None,
1344        });
1345        declared_columns.push(crate::physical::DeclaredColumnContract {
1346            name: "updated_at".to_string(),
1347            data_type: "BIGINT".to_string(),
1348            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1349            not_null: true,
1350            default: None,
1351            compress: None,
1352            unique: false,
1353            primary_key: false,
1354            enum_variants: Vec::new(),
1355            array_element: None,
1356            decimal_precision: None,
1357        });
1358    }
1359    Ok(crate::physical::CollectionContract {
1360        name: query.name.clone(),
1361        declared_model: crate::catalog::CollectionModel::Table,
1362        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1363        origin: crate::physical::ContractOrigin::Explicit,
1364        version: 1,
1365        created_at_unix_ms: now,
1366        updated_at_unix_ms: now,
1367        default_ttl_ms: query.default_ttl_ms,
1368        vector_dimension: None,
1369        vector_metric: None,
1370        context_index_fields: query.context_index_fields.clone(),
1371        declared_columns,
1372        table_def: Some(build_table_def_from_create_table(query)?),
1373        timestamps_enabled: query.timestamps,
1374        context_index_enabled: query.context_index_enabled
1375            || !query.context_index_fields.is_empty(),
1376        append_only: query.append_only,
1377        subscriptions: query.subscriptions.clone(),
1378    })
1379}
1380
1381fn default_collection_contract_for_existing_table(
1382    name: &str,
1383) -> crate::physical::CollectionContract {
1384    let now = current_unix_ms();
1385    crate::physical::CollectionContract {
1386        name: name.to_string(),
1387        declared_model: crate::catalog::CollectionModel::Table,
1388        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1389        origin: crate::physical::ContractOrigin::Explicit,
1390        version: 0,
1391        created_at_unix_ms: now,
1392        updated_at_unix_ms: now,
1393        default_ttl_ms: None,
1394        vector_dimension: None,
1395        vector_metric: None,
1396        context_index_fields: Vec::new(),
1397        declared_columns: Vec::new(),
1398        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1399        timestamps_enabled: false,
1400        context_index_enabled: false,
1401        append_only: false,
1402        subscriptions: Vec::new(),
1403    }
1404}
1405
1406fn keyed_collection_contract(
1407    name: &str,
1408    model: crate::catalog::CollectionModel,
1409) -> crate::physical::CollectionContract {
1410    let now = current_unix_ms();
1411    crate::physical::CollectionContract {
1412        name: name.to_string(),
1413        declared_model: model,
1414        schema_mode: crate::catalog::SchemaMode::Dynamic,
1415        origin: crate::physical::ContractOrigin::Explicit,
1416        version: 1,
1417        created_at_unix_ms: now,
1418        updated_at_unix_ms: now,
1419        default_ttl_ms: None,
1420        vector_dimension: None,
1421        vector_metric: None,
1422        context_index_fields: Vec::new(),
1423        declared_columns: Vec::new(),
1424        table_def: None,
1425        timestamps_enabled: false,
1426        context_index_enabled: false,
1427        append_only: false,
1428        subscriptions: Vec::new(),
1429    }
1430}
1431
1432fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1433    let now = current_unix_ms();
1434    crate::physical::CollectionContract {
1435        name: query.name.clone(),
1436        declared_model: crate::catalog::CollectionModel::Vector,
1437        schema_mode: crate::catalog::SchemaMode::Dynamic,
1438        origin: crate::physical::ContractOrigin::Explicit,
1439        version: 1,
1440        created_at_unix_ms: now,
1441        updated_at_unix_ms: now,
1442        default_ttl_ms: None,
1443        vector_dimension: Some(query.dimension),
1444        vector_metric: Some(query.metric),
1445        context_index_fields: Vec::new(),
1446        declared_columns: Vec::new(),
1447        table_def: None,
1448        timestamps_enabled: false,
1449        context_index_enabled: false,
1450        append_only: false,
1451        subscriptions: Vec::new(),
1452    }
1453}
1454
1455fn declared_column_contract_from_ddl(
1456    column: &CreateColumnDef,
1457) -> crate::physical::DeclaredColumnContract {
1458    crate::physical::DeclaredColumnContract {
1459        name: column.name.clone(),
1460        data_type: column.data_type.clone(),
1461        sql_type: Some(column.sql_type.clone()),
1462        not_null: column.not_null,
1463        default: column.default.clone(),
1464        compress: column.compress,
1465        unique: column.unique,
1466        primary_key: column.primary_key,
1467        enum_variants: column.enum_variants.clone(),
1468        array_element: column.array_element.clone(),
1469        decimal_precision: column.decimal_precision,
1470    }
1471}
1472
1473fn apply_alter_operations_to_contract(
1474    contract: &mut crate::physical::CollectionContract,
1475    operations: &[AlterOperation],
1476) {
1477    if contract.table_def.is_none() {
1478        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1479    }
1480    for operation in operations {
1481        match operation {
1482            AlterOperation::AddColumn(column) => {
1483                if !contract
1484                    .declared_columns
1485                    .iter()
1486                    .any(|existing| existing.name == column.name)
1487                {
1488                    contract
1489                        .declared_columns
1490                        .push(declared_column_contract_from_ddl(column));
1491                }
1492                if let Some(table_def) = contract.table_def.as_mut() {
1493                    if table_def.get_column(&column.name).is_none() {
1494                        if let Ok(column_def) = column_def_from_ddl(column) {
1495                            if column.primary_key {
1496                                table_def.primary_key.push(column.name.clone());
1497                                table_def.constraints.push(
1498                                    crate::storage::schema::Constraint::new(
1499                                        format!("pk_{}", column.name),
1500                                        crate::storage::schema::ConstraintType::PrimaryKey,
1501                                    )
1502                                    .on_columns(vec![column.name.clone()]),
1503                                );
1504                            }
1505                            if column.unique {
1506                                table_def.constraints.push(
1507                                    crate::storage::schema::Constraint::new(
1508                                        format!("uniq_{}", column.name),
1509                                        crate::storage::schema::ConstraintType::Unique,
1510                                    )
1511                                    .on_columns(vec![column.name.clone()]),
1512                                );
1513                            }
1514                            if column.not_null {
1515                                table_def.constraints.push(
1516                                    crate::storage::schema::Constraint::new(
1517                                        format!("not_null_{}", column.name),
1518                                        crate::storage::schema::ConstraintType::NotNull,
1519                                    )
1520                                    .on_columns(vec![column.name.clone()]),
1521                                );
1522                            }
1523                            table_def.columns.push(column_def);
1524                        }
1525                    }
1526                }
1527            }
1528            AlterOperation::DropColumn(name) => {
1529                contract
1530                    .declared_columns
1531                    .retain(|column| column.name != *name);
1532                if let Some(table_def) = contract.table_def.as_mut() {
1533                    if let Some(index) = table_def.column_index(name) {
1534                        table_def.columns.remove(index);
1535                    }
1536                    table_def.primary_key.retain(|column| column != name);
1537                    table_def.constraints.retain(|constraint| {
1538                        !constraint.columns.iter().any(|column| column == name)
1539                    });
1540                    table_def
1541                        .indexes
1542                        .retain(|index| !index.columns.iter().any(|column| column == name));
1543                }
1544            }
1545            AlterOperation::RenameColumn { from, to } => {
1546                if contract
1547                    .declared_columns
1548                    .iter()
1549                    .any(|column| column.name == *to)
1550                {
1551                    continue;
1552                }
1553                if let Some(column) = contract
1554                    .declared_columns
1555                    .iter_mut()
1556                    .find(|column| column.name == *from)
1557                {
1558                    column.name = to.clone();
1559                }
1560                if let Some(table_def) = contract.table_def.as_mut() {
1561                    if let Some(column) = table_def
1562                        .columns
1563                        .iter_mut()
1564                        .find(|column| column.name == *from)
1565                    {
1566                        column.name = to.clone();
1567                    }
1568                    for primary_key in &mut table_def.primary_key {
1569                        if *primary_key == *from {
1570                            *primary_key = to.clone();
1571                        }
1572                    }
1573                    for constraint in &mut table_def.constraints {
1574                        for column in &mut constraint.columns {
1575                            if *column == *from {
1576                                *column = to.clone();
1577                            }
1578                        }
1579                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1580                            for column in ref_columns {
1581                                if *column == *from {
1582                                    *column = to.clone();
1583                                }
1584                            }
1585                        }
1586                    }
1587                    for index in &mut table_def.indexes {
1588                        for column in &mut index.columns {
1589                            if *column == *from {
1590                                *column = to.clone();
1591                            }
1592                        }
1593                    }
1594                }
1595            }
1596            // Partition ops don't touch the column contract — metadata is
1597            // persisted separately via `red_config.partition.*`.
1598            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1599            // RLS toggles don't touch the column contract — flag is persisted
1600            // separately via `red_config.rls.enabled.{table}` and enforced
1601            // through the in-memory `rls_enabled_tables` set.
1602            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1603            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1604            // and are enforced through `tenant_tables` + RLS auto-policy.
1605            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1606            AlterOperation::SetAppendOnly(on) => {
1607                contract.append_only = *on;
1608            }
1609            // VCS opt-in is persisted to red_vcs_settings by the
1610            // executor, not the contract — nothing to do here.
1611            AlterOperation::SetVersioned(_) => {}
1612            AlterOperation::EnableEvents(subscription) => {
1613                let mut subscription = subscription.clone();
1614                subscription.source = contract.name.clone();
1615                subscription.enabled = true;
1616                if let Some(existing) = contract
1617                    .subscriptions
1618                    .iter_mut()
1619                    .find(|existing| existing.target_queue == subscription.target_queue)
1620                {
1621                    *existing = subscription;
1622                } else {
1623                    contract.subscriptions.push(subscription);
1624                }
1625            }
1626            AlterOperation::DisableEvents => {
1627                for subscription in &mut contract.subscriptions {
1628                    subscription.enabled = false;
1629                }
1630            }
1631            AlterOperation::AddSubscription { name, descriptor } => {
1632                let mut sub = descriptor.clone();
1633                sub.name = name.clone();
1634                sub.source = contract.name.clone();
1635                sub.enabled = true;
1636                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1637                {
1638                    *existing = sub;
1639                } else {
1640                    contract.subscriptions.push(sub);
1641                }
1642            }
1643            AlterOperation::DropSubscription { name } => {
1644                contract.subscriptions.retain(|s| s.name != *name);
1645            }
1646        }
1647    }
1648}
1649
1650fn validate_event_subscriptions(
1651    runtime: &RedDBRuntime,
1652    source: &str,
1653    subscriptions: &[crate::catalog::SubscriptionDescriptor],
1654) -> RedDBResult<()> {
1655    for subscription in subscriptions
1656        .iter()
1657        .filter(|subscription| subscription.enabled)
1658    {
1659        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1660            return Err(RedDBError::Query(
1661                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1662            ));
1663        }
1664        validate_subscription_auth(runtime, source, subscription)?;
1665        if subscription.target_queue == source
1666            || subscription_would_create_cycle(
1667                &runtime.inner.db,
1668                source,
1669                &subscription.target_queue,
1670            )
1671        {
1672            return Err(RedDBError::Query(
1673                "subscription would create cycle".to_string(),
1674            ));
1675        }
1676        audit_subscription_redact_gap(runtime, source, subscription);
1677    }
1678    Ok(())
1679}
1680
1681fn validate_subscription_auth(
1682    runtime: &RedDBRuntime,
1683    source: &str,
1684    subscription: &crate::catalog::SubscriptionDescriptor,
1685) -> RedDBResult<()> {
1686    let auth_store = match runtime.inner.auth_store.read().clone() {
1687        Some(store) => store,
1688        None => return Ok(()),
1689    };
1690    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1691        Some(identity) => identity,
1692        None => return Ok(()),
1693    };
1694    let tenant = crate::runtime::impl_core::current_tenant();
1695    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1696
1697    if auth_store.iam_authorization_enabled() {
1698        let ctx = crate::auth::policies::EvalContext {
1699            principal_tenant: tenant.clone(),
1700            current_tenant: tenant.clone(),
1701            peer_ip: None,
1702            mfa_present: false,
1703            now_ms: crate::auth::now_ms(),
1704            principal_is_admin_role: role == crate::auth::Role::Admin,
1705        };
1706        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1707        if let Some(t) = tenant.as_deref() {
1708            source_resource = source_resource.with_tenant(t.to_string());
1709        }
1710        if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1711            return Err(RedDBError::Query(format!(
1712                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1713                principal, source_resource.kind, source_resource.name
1714            )));
1715        }
1716
1717        let mut target_resource =
1718            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1719        if let Some(t) = tenant.as_deref() {
1720            target_resource = target_resource.with_tenant(t.to_string());
1721        }
1722        if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1723            return Err(RedDBError::Query(format!(
1724                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1725                principal, target_resource.kind, target_resource.name
1726            )));
1727        }
1728        return Ok(());
1729    }
1730
1731    let ctx = crate::auth::privileges::AuthzContext {
1732        principal: &username,
1733        effective_role: role,
1734        tenant: tenant.as_deref(),
1735    };
1736    auth_store
1737        .check_grant(
1738            &ctx,
1739            crate::auth::privileges::Action::Select,
1740            &crate::auth::privileges::Resource::table_from_name(source),
1741        )
1742        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1743    auth_store
1744        .check_grant(
1745            &ctx,
1746            crate::auth::privileges::Action::Insert,
1747            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1748        )
1749        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1750    Ok(())
1751}
1752
1753fn audit_subscription_redact_gap(
1754    runtime: &RedDBRuntime,
1755    source: &str,
1756    subscription: &crate::catalog::SubscriptionDescriptor,
1757) {
1758    let auth_store = match runtime.inner.auth_store.read().clone() {
1759        Some(store) if store.iam_authorization_enabled() => store,
1760        _ => return,
1761    };
1762    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1763        Some(identity) => identity,
1764        None => return,
1765    };
1766    let tenant = crate::runtime::impl_core::current_tenant();
1767    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1768    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1769    if missing.is_empty() {
1770        return;
1771    }
1772
1773    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1774    tracing::warn!(
1775        target: "reddb::operator",
1776        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1777        source,
1778        subscription.target_queue,
1779        columns
1780    );
1781    let mut event = AuditEvent::builder("subscription_redact_gap")
1782        .principal(username)
1783        .source(AuditAuthSource::System)
1784        .resource(format!(
1785            "subscription:{}->{}",
1786            source, subscription.target_queue
1787        ))
1788        .outcome(Outcome::Success)
1789        .field(AuditFieldEscaper::field("source", source))
1790        .field(AuditFieldEscaper::field(
1791            "target_queue",
1792            subscription.target_queue.clone(),
1793        ))
1794        .field(AuditFieldEscaper::field(
1795            "subscription",
1796            subscription.name.clone(),
1797        ))
1798        .field(AuditFieldEscaper::field("columns", columns))
1799        .field(AuditFieldEscaper::field("role", role.as_str()));
1800    if let Some(t) = tenant {
1801        event = event.tenant(t);
1802    }
1803    runtime.inner.audit_log.record_event(event.build());
1804}
1805
1806fn subscription_redact_gap_columns(
1807    auth_store: &crate::auth::store::AuthStore,
1808    principal: &crate::auth::UserId,
1809    source: &str,
1810    subscription: &crate::catalog::SubscriptionDescriptor,
1811) -> BTreeSet<String> {
1812    let redacted: HashSet<String> = subscription
1813        .redact_fields
1814        .iter()
1815        .map(|field| field.to_ascii_lowercase())
1816        .collect();
1817    auth_store
1818        .effective_policies(principal)
1819        .iter()
1820        .flat_map(|policy| policy.statements.iter())
1821        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1822        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1823        .flat_map(|statement| statement.resources.iter())
1824        .filter_map(|resource| denied_column_for_source(resource, source))
1825        .filter(|column| !redact_covers_column(&redacted, source, column))
1826        .collect()
1827}
1828
1829fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1830    match pattern {
1831        crate::auth::policies::ActionPattern::Wildcard => true,
1832        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1833        crate::auth::policies::ActionPattern::Prefix(prefix) => {
1834            "select".len() > prefix.len() + 1
1835                && "select".starts_with(prefix)
1836                && "select".as_bytes()[prefix.len()] == b':'
1837        }
1838    }
1839}
1840
1841fn denied_column_for_source(
1842    resource: &crate::auth::policies::ResourcePattern,
1843    source: &str,
1844) -> Option<String> {
1845    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1846        return None;
1847    };
1848    if kind != "column" {
1849        return None;
1850    }
1851    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1852    (column.table_resource_name() == source).then_some(column.column)
1853}
1854
1855fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1856    let column = column.to_ascii_lowercase();
1857    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1858    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1859}
1860
1861fn subscription_would_create_cycle(
1862    db: &crate::storage::unified::devx::RedDB,
1863    source: &str,
1864    target: &str,
1865) -> bool {
1866    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1867    for contract in db.collection_contracts() {
1868        for subscription in contract
1869            .subscriptions
1870            .into_iter()
1871            .filter(|subscription| subscription.enabled)
1872        {
1873            graph
1874                .entry(subscription.source)
1875                .or_default()
1876                .push(subscription.target_queue);
1877        }
1878    }
1879    graph
1880        .entry(source.to_string())
1881        .or_default()
1882        .push(target.to_string());
1883
1884    let mut stack = vec![target.to_string()];
1885    let mut seen = HashSet::new();
1886    while let Some(node) = stack.pop() {
1887        if node == source {
1888            return true;
1889        }
1890        if !seen.insert(node.clone()) {
1891            continue;
1892        }
1893        if let Some(next) = graph.get(&node) {
1894            stack.extend(next.iter().cloned());
1895        }
1896    }
1897    false
1898}
1899
1900pub(crate) fn ensure_event_target_queue_pub(
1901    runtime: &RedDBRuntime,
1902    queue: &str,
1903) -> RedDBResult<()> {
1904    ensure_event_target_queue(runtime, queue)
1905}
1906
1907fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
1908    let store = runtime.inner.db.store();
1909    if store.get_collection(queue).is_some() {
1910        return Ok(());
1911    }
1912    store
1913        .create_collection(queue)
1914        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1915    runtime
1916        .inner
1917        .db
1918        .save_collection_contract(event_queue_collection_contract(queue))
1919        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1920    store.set_config_tree(
1921        &format!("queue.{queue}.mode"),
1922        &crate::serde_json::Value::String("fanout".to_string()),
1923    );
1924    Ok(())
1925}
1926
1927fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
1928    let now = current_unix_ms();
1929    crate::physical::CollectionContract {
1930        name: queue.to_string(),
1931        declared_model: crate::catalog::CollectionModel::Queue,
1932        schema_mode: crate::catalog::SchemaMode::Dynamic,
1933        origin: crate::physical::ContractOrigin::Implicit,
1934        version: 1,
1935        created_at_unix_ms: now,
1936        updated_at_unix_ms: now,
1937        default_ttl_ms: None,
1938        vector_dimension: None,
1939        vector_metric: None,
1940        context_index_fields: Vec::new(),
1941        declared_columns: Vec::new(),
1942        table_def: None,
1943        timestamps_enabled: false,
1944        context_index_enabled: false,
1945        append_only: true,
1946        subscriptions: Vec::new(),
1947    }
1948}
1949
1950fn build_table_def_from_create_table(
1951    query: &CreateTableQuery,
1952) -> RedDBResult<crate::storage::schema::TableDef> {
1953    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
1954    for column in &query.columns {
1955        if column.primary_key {
1956            table.primary_key.push(column.name.clone());
1957            table.constraints.push(
1958                crate::storage::schema::Constraint::new(
1959                    format!("pk_{}", column.name),
1960                    crate::storage::schema::ConstraintType::PrimaryKey,
1961                )
1962                .on_columns(vec![column.name.clone()]),
1963            );
1964        }
1965        if column.unique {
1966            table.constraints.push(
1967                crate::storage::schema::Constraint::new(
1968                    format!("uniq_{}", column.name),
1969                    crate::storage::schema::ConstraintType::Unique,
1970                )
1971                .on_columns(vec![column.name.clone()]),
1972            );
1973        }
1974        if column.not_null {
1975            table.constraints.push(
1976                crate::storage::schema::Constraint::new(
1977                    format!("not_null_{}", column.name),
1978                    crate::storage::schema::ConstraintType::NotNull,
1979                )
1980                .on_columns(vec![column.name.clone()]),
1981            );
1982        }
1983        table.columns.push(column_def_from_ddl(column)?);
1984    }
1985    // WITH timestamps = true: append the two runtime-managed columns
1986    // to the schema so resolved_contract_columns exposes them to the
1987    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
1988    // not-nullable; the write path auto-fills them.
1989    if query.timestamps {
1990        table.columns.push(
1991            crate::storage::schema::ColumnDef::new(
1992                "created_at".to_string(),
1993                crate::storage::schema::DataType::UnsignedInteger,
1994            )
1995            .not_null(),
1996        );
1997        table.columns.push(
1998            crate::storage::schema::ColumnDef::new(
1999                "updated_at".to_string(),
2000                crate::storage::schema::DataType::UnsignedInteger,
2001            )
2002            .not_null(),
2003        );
2004        table.constraints.push(
2005            crate::storage::schema::Constraint::new(
2006                "not_null_created_at".to_string(),
2007                crate::storage::schema::ConstraintType::NotNull,
2008            )
2009            .on_columns(vec!["created_at".to_string()]),
2010        );
2011        table.constraints.push(
2012            crate::storage::schema::Constraint::new(
2013                "not_null_updated_at".to_string(),
2014                crate::storage::schema::ConstraintType::NotNull,
2015            )
2016            .on_columns(vec!["updated_at".to_string()]),
2017        );
2018    }
2019    table
2020        .validate()
2021        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2022    Ok(table)
2023}
2024
2025fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2026    let data_type = resolve_declared_data_type(&column.data_type)
2027        .map_err(|err| RedDBError::Query(err.to_string()))?;
2028    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2029    if column.not_null {
2030        column_def = column_def.not_null();
2031    }
2032    if let Some(default) = &column.default {
2033        column_def = column_def.with_default(default.as_bytes().to_vec());
2034    }
2035    if column.compress.unwrap_or(0) > 0 {
2036        column_def = column_def.compressed();
2037    }
2038    if !column.enum_variants.is_empty() {
2039        column_def = column_def.with_variants(column.enum_variants.clone());
2040    }
2041    if let Some(precision) = column.decimal_precision {
2042        column_def = column_def.with_precision(precision);
2043    }
2044    if let Some(element_type) = &column.array_element {
2045        column_def = column_def.with_element_type(
2046            resolve_declared_data_type(element_type)
2047                .map_err(|err| RedDBError::Query(err.to_string()))?,
2048        );
2049    }
2050    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2051    if column.unique {
2052        column_def = column_def.with_metadata("unique", "true");
2053    }
2054    if column.primary_key {
2055        column_def = column_def.with_metadata("primary_key", "true");
2056    }
2057    Ok(column_def)
2058}
2059
2060fn current_unix_ms() -> u128 {
2061    std::time::SystemTime::now()
2062        .duration_since(std::time::UNIX_EPOCH)
2063        .unwrap_or_default()
2064        .as_millis()
2065}
2066
2067#[cfg(test)]
2068mod tests {
2069    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2070    use crate::auth::store::{AuthStore, PrincipalRef};
2071    use crate::auth::UserId;
2072    use crate::auth::{AuthConfig, Role};
2073    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2074    use crate::storage::schema::Value;
2075    use crate::{RedDBOptions, RedDBRuntime};
2076    use std::sync::Arc;
2077
2078    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2079        Policy {
2080            id: id.to_string(),
2081            version: 1,
2082            tenant: None,
2083            created_at: 0,
2084            updated_at: 0,
2085            statements: vec![Statement {
2086                sid: None,
2087                effect: Effect::Allow,
2088                actions: vec![ActionPattern::Exact(action.to_string())],
2089                resources: vec![ResourcePattern::Exact {
2090                    kind: "collection".to_string(),
2091                    name: collection.to_string(),
2092                }],
2093                condition: None,
2094            }],
2095        }
2096    }
2097
2098    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2099        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2100        *rt.inner.auth_store.write() = Some(store.clone());
2101        store
2102    }
2103
2104    #[test]
2105    fn drop_denied_without_iam_policy() {
2106        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2107        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2108        let store = wire_auth_store(&rt);
2109        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2110        let select_only = Policy {
2111            id: "select-only".to_string(),
2112            version: 1,
2113            tenant: None,
2114            created_at: 0,
2115            updated_at: 0,
2116            statements: vec![Statement {
2117                sid: None,
2118                effect: Effect::Allow,
2119                actions: vec![ActionPattern::Exact("select".to_string())],
2120                resources: vec![ResourcePattern::Wildcard],
2121                condition: None,
2122            }],
2123        };
2124        store.put_policy_internal(select_only).unwrap();
2125        let alice = UserId::from_parts(None, "alice");
2126        store
2127            .attach_policy(PrincipalRef::User(alice), "select-only")
2128            .unwrap();
2129        set_current_auth_identity("alice".to_string(), Role::Write);
2130        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2131        clear_current_auth_identity();
2132        assert!(
2133            format!("{err}").contains("denied by IAM policy"),
2134            "got: {err}"
2135        );
2136    }
2137
2138    #[test]
2139    fn drop_allowed_with_explicit_iam_policy() {
2140        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2141        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2142        let store = wire_auth_store(&rt);
2143        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2144        store.put_policy_internal(policy).unwrap();
2145        let bob = UserId::from_parts(None, "bob");
2146        store
2147            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2148            .unwrap();
2149        set_current_auth_identity("bob".to_string(), Role::Write);
2150        rt.execute_query("DROP TABLE bar").unwrap();
2151        clear_current_auth_identity();
2152    }
2153
2154    #[test]
2155    fn drop_allowed_with_wildcard_iam_policy() {
2156        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2157        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2158        let store = wire_auth_store(&rt);
2159        let policy = Policy {
2160            id: "allow-drop-all".to_string(),
2161            version: 1,
2162            tenant: None,
2163            created_at: 0,
2164            updated_at: 0,
2165            statements: vec![Statement {
2166                sid: None,
2167                effect: Effect::Allow,
2168                actions: vec![ActionPattern::Exact("drop".to_string())],
2169                resources: vec![ResourcePattern::Wildcard],
2170                condition: None,
2171            }],
2172        };
2173        store.put_policy_internal(policy).unwrap();
2174        let carl = UserId::from_parts(None, "carl");
2175        store
2176            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2177            .unwrap();
2178        set_current_auth_identity("carl".to_string(), Role::Write);
2179        rt.execute_query("DROP TABLE baz").unwrap();
2180        clear_current_auth_identity();
2181    }
2182
2183    #[test]
2184    fn truncate_denied_without_iam_policy() {
2185        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2186        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2187        let store = wire_auth_store(&rt);
2188        // A policy exists (IAM active) but gives no truncate right.
2189        let select_only = Policy {
2190            id: "select-only-2".to_string(),
2191            version: 1,
2192            tenant: None,
2193            created_at: 0,
2194            updated_at: 0,
2195            statements: vec![Statement {
2196                sid: None,
2197                effect: Effect::Allow,
2198                actions: vec![ActionPattern::Exact("select".to_string())],
2199                resources: vec![ResourcePattern::Wildcard],
2200                condition: None,
2201            }],
2202        };
2203        store.put_policy_internal(select_only).unwrap();
2204        let dana = UserId::from_parts(None, "dana");
2205        store
2206            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2207            .unwrap();
2208        set_current_auth_identity("dana".to_string(), Role::Write);
2209        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2210        clear_current_auth_identity();
2211        assert!(
2212            format!("{err}").contains("denied by IAM policy"),
2213            "got: {err}"
2214        );
2215    }
2216
2217    #[test]
2218    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2219        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2220        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2221            .unwrap();
2222        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2223            .unwrap();
2224        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2225            .unwrap();
2226
2227        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2228        assert_eq!(truncated.statement_type, "truncate");
2229        assert_eq!(truncated.affected_rows, 0);
2230
2231        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2232        assert!(empty.result.records.is_empty());
2233
2234        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2235            .unwrap();
2236        let selected = rt
2237            .execute_query("SELECT name FROM users WHERE id = 3")
2238            .unwrap();
2239        let name = selected.result.records[0].get("name").unwrap();
2240        assert_eq!(name, &Value::text("cy"));
2241        assert!(rt.db().collection_contract("users").is_some());
2242        assert!(rt
2243            .inner
2244            .index_store
2245            .list_indices("users")
2246            .iter()
2247            .any(|index| index.name == "idx_users_id"));
2248    }
2249
2250    #[test]
2251    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2252        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2253        rt.execute_query("CREATE QUEUE tasks").unwrap();
2254        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2255
2256        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2257        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2258
2259        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2260        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2261        assert_eq!(
2262            len.result.records[0].get("len"),
2263            Some(&Value::UnsignedInteger(0))
2264        );
2265    }
2266
2267    #[test]
2268    fn truncate_system_schema_is_read_only() {
2269        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2270        let err = rt
2271            .execute_query("TRUNCATE COLLECTION red.collections")
2272            .unwrap_err();
2273        assert!(format!("{err}").contains("system schema is read-only"));
2274    }
2275
2276    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2277
2278    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2279        let result = rt
2280            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2281            .expect("peek queue");
2282        result
2283            .result
2284            .records
2285            .iter()
2286            .map(
2287                |record| match record.get("payload").expect("payload column") {
2288                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2289                    other => panic!("expected JSON queue payload, got {other:?}"),
2290                },
2291            )
2292            .collect()
2293    }
2294
2295    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2296    /// `truncate` event, not one delete event per row.
2297    #[test]
2298    fn truncate_event_enabled_table_emits_single_truncate_event() {
2299        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2300        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2301            .unwrap();
2302        rt.execute_query(
2303            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2304        )
2305        .unwrap();
2306
2307        // Drain the 3 insert events so we start clean.
2308        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2309
2310        rt.execute_query("TRUNCATE TABLE users").unwrap();
2311
2312        let events = queue_payloads(&rt, "users_events");
2313        // Must be exactly 1 truncate event, not 3 delete events.
2314        assert_eq!(
2315            events.len(),
2316            1,
2317            "expected 1 truncate event, got {}",
2318            events.len()
2319        );
2320        let ev = events[0].as_object().expect("event is object");
2321        assert_eq!(
2322            ev.get("op").and_then(crate::json::Value::as_str),
2323            Some("truncate")
2324        );
2325        assert_eq!(
2326            ev.get("collection").and_then(crate::json::Value::as_str),
2327            Some("users")
2328        );
2329        assert_eq!(
2330            ev.get("entities_count")
2331                .and_then(crate::json::Value::as_u64),
2332            Some(3)
2333        );
2334        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2335        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2336        assert!(ev
2337            .get("event_id")
2338            .and_then(crate::json::Value::as_str)
2339            .is_some_and(|s| !s.is_empty()));
2340    }
2341
2342    /// `TRUNCATE users` on a collection without event subscription emits no events.
2343    #[test]
2344    fn truncate_no_events_collection_emits_nothing() {
2345        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2346        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2347            .unwrap();
2348        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2349            .unwrap();
2350        // No EVENTS subscription — truncate must work without touching any queue.
2351        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2352        // No crash, no queue to check. Just verify truncation happened.
2353        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2354        assert!(rows.result.records.is_empty());
2355    }
2356
2357    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2358    /// `collection_dropped` event. The subscription is removed from the
2359    /// source contract but the target queue is preserved for consumer drain.
2360    #[test]
2361    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2362        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2363        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2364            .unwrap();
2365        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2366            .unwrap();
2367
2368        // Drain insert events so we start clean.
2369        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2370
2371        rt.execute_query("DROP TABLE users").unwrap();
2372
2373        // Queue must still exist with 1 collection_dropped event.
2374        let events = queue_payloads(&rt, "users_events");
2375        assert_eq!(
2376            events.len(),
2377            1,
2378            "expected 1 collection_dropped event, got {}",
2379            events.len()
2380        );
2381        let ev = events[0].as_object().expect("event is object");
2382        assert_eq!(
2383            ev.get("op").and_then(crate::json::Value::as_str),
2384            Some("collection_dropped")
2385        );
2386        assert_eq!(
2387            ev.get("collection").and_then(crate::json::Value::as_str),
2388            Some("users")
2389        );
2390        assert_eq!(
2391            ev.get("final_entities_count")
2392                .and_then(crate::json::Value::as_u64),
2393            Some(2)
2394        );
2395        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2396        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2397        assert!(ev
2398            .get("event_id")
2399            .and_then(crate::json::Value::as_str)
2400            .is_some_and(|s| !s.is_empty()));
2401
2402        // Source collection is gone.
2403        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2404        assert!(
2405            format!("{err}").contains("users"),
2406            "expected not-found error"
2407        );
2408    }
2409
2410    /// `DROP TABLE users` on a collection without event subscription works
2411    /// normally with no event emitted.
2412    #[test]
2413    fn drop_no_events_collection_emits_nothing() {
2414        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2415        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2416            .unwrap();
2417        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2418            .unwrap();
2419        rt.execute_query("DROP TABLE plain").unwrap();
2420        // No crash and collection is gone.
2421        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2422        assert!(format!("{err}").contains("plain"));
2423    }
2424
2425    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2426
2427    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2428    #[test]
2429    fn ops_filter_insert_only_ignores_update_and_delete() {
2430        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2431        rt.execute_query(
2432            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2433        )
2434        .unwrap();
2435        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2436            .unwrap();
2437        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2438            .unwrap();
2439        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2440
2441        let events = queue_payloads(&rt, "items_events");
2442        // Only the INSERT should have fired.
2443        assert_eq!(
2444            events.len(),
2445            1,
2446            "expected 1 insert event, got {}",
2447            events.len()
2448        );
2449        assert_eq!(
2450            events[0]
2451                .as_object()
2452                .unwrap()
2453                .get("op")
2454                .and_then(crate::json::Value::as_str),
2455            Some("insert")
2456        );
2457    }
2458
2459    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2460    #[test]
2461    fn where_filter_skips_rows_that_do_not_match() {
2462        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2463        rt.execute_query(
2464            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2465        )
2466        .unwrap();
2467
2468        // This row should generate an event.
2469        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2470            .unwrap();
2471        // This row should NOT generate an event.
2472        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2473            .unwrap();
2474
2475        let events = queue_payloads(&rt, "users_events");
2476        assert_eq!(
2477            events.len(),
2478            1,
2479            "expected 1 event (only active), got {}",
2480            events.len()
2481        );
2482        let ev = events[0].as_object().unwrap();
2483        assert_eq!(
2484            ev.get("op").and_then(crate::json::Value::as_str),
2485            Some("insert")
2486        );
2487        let after = ev.get("after").unwrap().as_object().unwrap();
2488        assert_eq!(
2489            after.get("status").and_then(crate::json::Value::as_str),
2490            Some("active")
2491        );
2492    }
2493
2494    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2495    #[test]
2496    fn ops_filter_and_where_filter_combined() {
2497        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2498        rt.execute_query(
2499            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2500        )
2501        .unwrap();
2502
2503        // INSERT active → event
2504        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2505            .unwrap();
2506        // INSERT inactive → no event
2507        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2508            .unwrap();
2509        // UPDATE row 1 to inactive → after = inactive, no event
2510        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2511            .unwrap();
2512        // DELETE → ops_filter excludes it
2513        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2514
2515        let events = queue_payloads(&rt, "items_events");
2516        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2517        assert_eq!(
2518            events.len(),
2519            1,
2520            "expected 1 event, got {}: {events:?}",
2521            events.len()
2522        );
2523        assert_eq!(
2524            events[0]
2525                .as_object()
2526                .unwrap()
2527                .get("op")
2528                .and_then(crate::json::Value::as_str),
2529            Some("insert")
2530        );
2531    }
2532
2533    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2534    #[test]
2535    fn where_filter_on_delete_checks_before_state() {
2536        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2537        rt.execute_query(
2538            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2539        )
2540        .unwrap();
2541
2542        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2543            .unwrap();
2544
2545        // Delete active row → event (before-state was active)
2546        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2547        // Delete inactive row → no event (before-state was inactive)
2548        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2549
2550        let events = queue_payloads(&rt, "users_events");
2551        assert_eq!(
2552            events.len(),
2553            1,
2554            "expected 1 delete event, got {}",
2555            events.len()
2556        );
2557        let ev = events[0].as_object().unwrap();
2558        assert_eq!(
2559            ev.get("op").and_then(crate::json::Value::as_str),
2560            Some("delete")
2561        );
2562    }
2563
2564    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2565
2566    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2567    #[test]
2568    fn alter_add_column_on_event_enabled_table_succeeds() {
2569        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2570        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2571            .unwrap();
2572        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2573        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2574            .unwrap();
2575        // The column is now in the contract.
2576        let contract = rt.db().collection_contract("users").unwrap();
2577        assert!(
2578            contract.declared_columns.iter().any(|c| c.name == "phone"),
2579            "phone column should be in contract"
2580        );
2581        // Subscription still enabled after the alter.
2582        assert!(
2583            contract.subscriptions.iter().any(|s| s.enabled),
2584            "subscription should remain enabled"
2585        );
2586    }
2587
2588    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2589    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2590    #[test]
2591    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2592        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2593        rt.execute_query(
2594            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2595        )
2596        .unwrap();
2597        // DROP COLUMN — schema change event path exercises, must not error.
2598        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2599            .unwrap();
2600        let contract = rt.db().collection_contract("items").unwrap();
2601        assert!(
2602            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2603            "secret column should be removed"
2604        );
2605        // ENABLE RLS — non-column op, no schema-change event (coverage).
2606        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2607            .unwrap();
2608        // Collection and subscription still intact.
2609        assert!(
2610            contract.subscriptions.iter().any(|s| s.enabled),
2611            "subscription should remain enabled"
2612        );
2613    }
2614}