Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(&query.name, query.collection_model)
265        };
266        self.inner
267            .db
268            .save_collection_contract(contract)
269            .map_err(|err| RedDBError::Internal(err.to_string()))?;
270        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271            store.set_config_tree(
272                &format!("red.collection_tenants.{}", query.name),
273                &crate::serde_json::Value::String(tenant_id),
274            );
275        }
276        self.inner
277            .db
278            .persist_metadata()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.invalidate_result_cache();
281
282        Ok(RuntimeQueryResult::ok_message(
283            raw_query.to_string(),
284            &format!("{label} '{}' created", query.name),
285            "create",
286        ))
287    }
288
289    pub fn execute_create_collection(
290        &self,
291        raw_query: &str,
292        query: &CreateCollectionQuery,
293    ) -> RedDBResult<RuntimeQueryResult> {
294        let model = match query.kind.as_str() {
295            "graph" => CollectionModel::Graph,
296            "document" => CollectionModel::Document,
297            "metrics" => CollectionModel::Metrics,
298            other => {
299                return Err(RedDBError::Query(format!(
300                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
301                )));
302            }
303        };
304        let create = CreateTableQuery {
305            collection_model: model,
306            name: query.name.clone(),
307            columns: Vec::new(),
308            if_not_exists: query.if_not_exists,
309            default_ttl_ms: None,
310            metrics_rollup_policies: Vec::new(),
311            context_index_fields: Vec::new(),
312            context_index_enabled: false,
313            timestamps: false,
314            partition_by: None,
315            tenant_by: None,
316            append_only: false,
317            subscriptions: Vec::new(),
318            vault_own_master_key: false,
319        };
320        self.execute_create_table(raw_query, &create)
321    }
322
323    pub fn execute_create_vector(
324        &self,
325        raw_query: &str,
326        query: &CreateVectorQuery,
327    ) -> RedDBResult<RuntimeQueryResult> {
328        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
329        if is_system_schema_name(&query.name) {
330            return Err(RedDBError::Query("system schema is read-only".to_string()));
331        }
332        let store = self.inner.db.store();
333        if store.get_collection(&query.name).is_some() {
334            if query.if_not_exists {
335                return Ok(RuntimeQueryResult::ok_message(
336                    raw_query.to_string(),
337                    &format!("vector '{}' already exists", query.name),
338                    "create",
339                ));
340            }
341            return Err(RedDBError::Query(format!(
342                "vector '{}' already exists",
343                query.name
344            )));
345        }
346
347        store
348            .create_collection(&query.name)
349            .map_err(|err| RedDBError::Internal(err.to_string()))?;
350        self.inner
351            .db
352            .save_collection_contract(vector_collection_contract(query))
353            .map_err(|err| RedDBError::Internal(err.to_string()))?;
354        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
355            store.set_config_tree(
356                &format!("red.collection_tenants.{}", query.name),
357                &crate::serde_json::Value::String(tenant_id),
358            );
359        }
360        self.inner
361            .db
362            .persist_metadata()
363            .map_err(|err| RedDBError::Internal(err.to_string()))?;
364        self.invalidate_result_cache();
365
366        Ok(RuntimeQueryResult::ok_message(
367            raw_query.to_string(),
368            &format!("vector '{}' created", query.name),
369            "create",
370        ))
371    }
372
373    fn provision_vault_key_material(
374        &self,
375        collection: &str,
376        own_master_key: bool,
377    ) -> RedDBResult<()> {
378        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
379            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
380        })?;
381        if !auth_store.is_vault_backed() {
382            return Err(RedDBError::Query(
383                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
384            ));
385        }
386
387        if auth_store.vault_secret_key().is_none() {
388            let key = crate::auth::store::random_bytes(32);
389            auth_store
390                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
391                .map_err(|err| RedDBError::Query(err.to_string()))?;
392        }
393
394        if own_master_key {
395            let key = crate::auth::store::random_bytes(32);
396            auth_store
397                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
398                .map_err(|err| RedDBError::Query(err.to_string()))?;
399        }
400
401        Ok(())
402    }
403
404    /// Execute DROP TABLE
405    ///
406    /// Drops the collection and all its data from the store.
407    pub fn execute_drop_table(
408        &self,
409        raw_query: &str,
410        query: &DropTableQuery,
411    ) -> RedDBResult<RuntimeQueryResult> {
412        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
413        let store = self.inner.db.store();
414
415        if is_system_schema_name(&query.name) {
416            return Err(RedDBError::Query("system schema is read-only".to_string()));
417        }
418
419        let exists = store.get_collection(&query.name).is_some();
420        if !exists {
421            if query.if_exists {
422                return Ok(RuntimeQueryResult::ok_message(
423                    raw_query.to_string(),
424                    &format!("table '{}' does not exist", query.name),
425                    "drop",
426                ));
427            }
428            return Err(RedDBError::NotFound(format!(
429                "table '{}' not found",
430                query.name
431            )));
432        }
433        let actual =
434            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
435        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
436
437        // Emit 1 collection_dropped event before storage is wiped.
438        // Queue is preserved; subscription is removed with the contract below.
439        let final_count = store
440            .get_collection(&query.name)
441            .map(|manager| manager.query_all(|_| true).len() as u64)
442            .unwrap_or(0);
443        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
444            self,
445            &query.name,
446            final_count,
447        )?;
448
449        let orphaned_indices: Vec<String> = self
450            .inner
451            .index_store
452            .list_indices(&query.name)
453            .into_iter()
454            .map(|index| index.name)
455            .collect();
456        for name in &orphaned_indices {
457            self.inner.index_store.drop_index(name, &query.name);
458        }
459
460        store
461            .drop_collection(&query.name)
462            .map_err(|err| RedDBError::Internal(err.to_string()))?;
463        self.inner.db.invalidate_vector_index(&query.name);
464        self.inner.db.clear_collection_default_ttl_ms(&query.name);
465        self.inner
466            .db
467            .remove_collection_contract(&query.name)
468            .map_err(|err| RedDBError::Internal(err.to_string()))?;
469        self.clear_table_planner_stats(&query.name);
470        self.invalidate_result_cache();
471        // Issue #119: a dropped collection vanishes from every
472        // (tenant, role)'s visible-collections set. Auth is optional in
473        // embedded mode so guard the call.
474        if let Some(store) = self.inner.auth_store.read().clone() {
475            store.invalidate_visible_collections_cache();
476        }
477        self.inner
478            .db
479            .persist_metadata()
480            .map_err(|err| RedDBError::Internal(err.to_string()))?;
481        // Issue #120 — drop both the collection entries *and* every
482        // index entry that was scoped to this collection.  Dropping the
483        // collection wipes columns + collection-name + type-tags +
484        // index hits in one pass via `purge_collection_entries`, so
485        // the explicit `DropIndex` calls would be redundant.
486        self.schema_vocabulary_apply(
487            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
488                collection: query.name.clone(),
489            },
490        );
491
492        Ok(RuntimeQueryResult::ok_message(
493            raw_query.to_string(),
494            &format!("table '{}' dropped", query.name),
495            "drop",
496        ))
497    }
498
499    pub fn execute_drop_graph(
500        &self,
501        raw_query: &str,
502        query: &DropGraphQuery,
503    ) -> RedDBResult<RuntimeQueryResult> {
504        self.execute_drop_typed_collection(
505            raw_query,
506            &query.name,
507            query.if_exists,
508            CollectionModel::Graph,
509            "graph",
510        )
511    }
512
513    pub fn execute_drop_vector(
514        &self,
515        raw_query: &str,
516        query: &DropVectorQuery,
517    ) -> RedDBResult<RuntimeQueryResult> {
518        self.execute_drop_typed_collection(
519            raw_query,
520            &query.name,
521            query.if_exists,
522            CollectionModel::Vector,
523            "vector",
524        )
525    }
526
527    pub fn execute_drop_document(
528        &self,
529        raw_query: &str,
530        query: &DropDocumentQuery,
531    ) -> RedDBResult<RuntimeQueryResult> {
532        self.execute_drop_typed_collection(
533            raw_query,
534            &query.name,
535            query.if_exists,
536            CollectionModel::Document,
537            "document",
538        )
539    }
540
541    pub fn execute_drop_kv(
542        &self,
543        raw_query: &str,
544        query: &DropKvQuery,
545    ) -> RedDBResult<RuntimeQueryResult> {
546        let label = polymorphic_resolver::model_name(query.model);
547        self.execute_drop_typed_collection(
548            raw_query,
549            &query.name,
550            query.if_exists,
551            query.model,
552            label,
553        )
554    }
555
556    pub fn execute_drop_collection(
557        &self,
558        raw_query: &str,
559        query: &DropCollectionQuery,
560    ) -> RedDBResult<RuntimeQueryResult> {
561        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
562        if is_system_schema_name(&query.name) {
563            return Err(RedDBError::Query("system schema is read-only".to_string()));
564        }
565        let store = self.inner.db.store();
566        if store.get_collection(&query.name).is_none() {
567            if query.if_exists {
568                return Ok(RuntimeQueryResult::ok_message(
569                    raw_query.to_string(),
570                    &format!("collection '{}' does not exist", query.name),
571                    "drop",
572                ));
573            }
574            return Err(RedDBError::NotFound(format!(
575                "collection '{}' not found",
576                query.name
577            )));
578        }
579
580        let actual =
581            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
582        if let Some(expected) = query.model {
583            polymorphic_resolver::ensure_model_match(expected, actual)?;
584        }
585
586        match actual {
587            CollectionModel::Table => self.execute_drop_table(
588                raw_query,
589                &DropTableQuery {
590                    name: query.name.clone(),
591                    if_exists: query.if_exists,
592                },
593            ),
594            CollectionModel::TimeSeries => self.execute_drop_timeseries(
595                raw_query,
596                &DropTimeSeriesQuery {
597                    name: query.name.clone(),
598                    if_exists: query.if_exists,
599                },
600            ),
601            CollectionModel::Queue => self.execute_drop_queue(
602                raw_query,
603                &DropQueueQuery {
604                    name: query.name.clone(),
605                    if_exists: query.if_exists,
606                },
607            ),
608            CollectionModel::Graph => self.execute_drop_graph(
609                raw_query,
610                &DropGraphQuery {
611                    name: query.name.clone(),
612                    if_exists: query.if_exists,
613                },
614            ),
615            CollectionModel::Vector => self.execute_drop_vector(
616                raw_query,
617                &DropVectorQuery {
618                    name: query.name.clone(),
619                    if_exists: query.if_exists,
620                },
621            ),
622            CollectionModel::Document => self.execute_drop_document(
623                raw_query,
624                &DropDocumentQuery {
625                    name: query.name.clone(),
626                    if_exists: query.if_exists,
627                },
628            ),
629            CollectionModel::Kv => self.execute_drop_kv(
630                raw_query,
631                &DropKvQuery {
632                    name: query.name.clone(),
633                    if_exists: query.if_exists,
634                    model: CollectionModel::Kv,
635                },
636            ),
637            CollectionModel::Config => self.execute_drop_kv(
638                raw_query,
639                &DropKvQuery {
640                    name: query.name.clone(),
641                    if_exists: query.if_exists,
642                    model: CollectionModel::Config,
643                },
644            ),
645            CollectionModel::Vault => self.execute_drop_kv(
646                raw_query,
647                &DropKvQuery {
648                    name: query.name.clone(),
649                    if_exists: query.if_exists,
650                    model: CollectionModel::Vault,
651                },
652            ),
653            CollectionModel::Hll => self.execute_probabilistic_command(
654                raw_query,
655                &ProbabilisticCommand::DropHll {
656                    name: query.name.clone(),
657                    if_exists: query.if_exists,
658                },
659            ),
660            CollectionModel::Sketch => self.execute_probabilistic_command(
661                raw_query,
662                &ProbabilisticCommand::DropSketch {
663                    name: query.name.clone(),
664                    if_exists: query.if_exists,
665                },
666            ),
667            CollectionModel::Filter => self.execute_probabilistic_command(
668                raw_query,
669                &ProbabilisticCommand::DropFilter {
670                    name: query.name.clone(),
671                    if_exists: query.if_exists,
672                },
673            ),
674            CollectionModel::Metrics => self.execute_drop_typed_collection(
675                raw_query,
676                &query.name,
677                query.if_exists,
678                CollectionModel::Metrics,
679                "metrics",
680            ),
681            CollectionModel::Mixed => self.execute_drop_typed_collection(
682                raw_query,
683                &query.name,
684                query.if_exists,
685                CollectionModel::Mixed,
686                "collection",
687            ),
688        }
689    }
690
691    /// Execute ALTER TABLE
692    ///
693    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
694    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
695    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
696    pub fn execute_alter_table(
697        &self,
698        raw_query: &str,
699        query: &AlterTableQuery,
700    ) -> RedDBResult<RuntimeQueryResult> {
701        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
702        let store = self.inner.db.store();
703
704        // Verify the table exists.
705        if store.get_collection(&query.name).is_none() {
706            return Err(RedDBError::NotFound(format!(
707                "table '{}' not found",
708                query.name
709            )));
710        }
711
712        let mut messages = Vec::new();
713
714        // Collect column-level changes upfront for schema-change event emission below.
715        let fields_added: Vec<String> = query
716            .operations
717            .iter()
718            .filter_map(|op| {
719                if let AlterOperation::AddColumn(col) = op {
720                    Some(col.name.clone())
721                } else {
722                    None
723                }
724            })
725            .collect();
726        let fields_removed: Vec<String> = query
727            .operations
728            .iter()
729            .filter_map(|op| {
730                if let AlterOperation::DropColumn(name) = op {
731                    Some(name.clone())
732                } else {
733                    None
734                }
735            })
736            .collect();
737
738        for op in &query.operations {
739            match op {
740                AlterOperation::AddColumn(col) => {
741                    // Schema-on-read: column will be available on next insert.
742                    messages.push(format!("column '{}' added", col.name));
743                }
744                AlterOperation::DropColumn(name) => {
745                    messages.push(format!("column '{}' dropped", name));
746                }
747                AlterOperation::RenameColumn { from, to } => {
748                    messages.push(format!("column '{}' renamed to '{}'", from, to));
749                }
750                AlterOperation::AttachPartition { child, bound } => {
751                    // Persist child → parent binding in red_config so the
752                    // future planner-side pruner can enumerate children and
753                    // evaluate their bounds.
754                    store.set_config_tree(
755                        &format!("partition.{}.children.{}", query.name, child),
756                        &crate::serde_json::Value::String(bound.clone()),
757                    );
758                    messages.push(format!(
759                        "partition '{child}' attached to '{}' ({bound})",
760                        query.name
761                    ));
762                }
763                AlterOperation::DetachPartition { child } => {
764                    store.set_config_tree(
765                        &format!("partition.{}.children.{}", query.name, child),
766                        &crate::serde_json::Value::Null,
767                    );
768                    messages.push(format!(
769                        "partition '{child}' detached from '{}'",
770                        query.name
771                    ));
772                }
773                AlterOperation::EnableRowLevelSecurity => {
774                    self.inner
775                        .rls_enabled_tables
776                        .write()
777                        .insert(query.name.clone());
778                    // Persist flag so RLS survives restart via red_config.
779                    store.set_config_tree(
780                        &format!("rls.enabled.{}", query.name),
781                        &crate::serde_json::Value::Bool(true),
782                    );
783                    self.invalidate_plan_cache();
784                    messages.push(format!("row level security enabled on '{}'", query.name));
785                }
786                AlterOperation::DisableRowLevelSecurity => {
787                    self.inner.rls_enabled_tables.write().remove(&query.name);
788                    store.set_config_tree(
789                        &format!("rls.enabled.{}", query.name),
790                        &crate::serde_json::Value::Null,
791                    );
792                    self.invalidate_plan_cache();
793                    messages.push(format!("row level security disabled on '{}'", query.name));
794                }
795                // Phase 2.5.4: retrofit tenancy onto an existing table.
796                AlterOperation::EnableTenancy { column } => {
797                    store.set_config_tree(
798                        &format!("tenant_tables.{}.column", query.name),
799                        &crate::serde_json::Value::String(column.clone()),
800                    );
801                    self.register_tenant_table(&query.name, column);
802                    self.invalidate_plan_cache();
803                    messages.push(format!(
804                        "tenancy enabled on '{}' by column '{column}'",
805                        query.name
806                    ));
807                }
808                AlterOperation::DisableTenancy => {
809                    store.set_config_tree(
810                        &format!("tenant_tables.{}.column", query.name),
811                        &crate::serde_json::Value::Null,
812                    );
813                    self.unregister_tenant_table(&query.name);
814                    self.invalidate_plan_cache();
815                    messages.push(format!("tenancy disabled on '{}'", query.name));
816                }
817                AlterOperation::SetAppendOnly(on) => {
818                    // Contract is the single source of truth for the
819                    // UPDATE/DELETE parse-time guard. The flag lands
820                    // below via `apply_alter_operations_to_contract`;
821                    // here we only publish the human-readable message.
822                    messages.push(format!(
823                        "append_only {} on '{}'",
824                        if *on { "enabled" } else { "disabled" },
825                        query.name
826                    ));
827                }
828                AlterOperation::SetVersioned(on) => {
829                    // Opt the collection into (or out of) Git-for-Data.
830                    // Persists a row in red_vcs_settings; next AS OF /
831                    // merge / diff against this table honours the
832                    // flag. Retroactive: existing row versions whose
833                    // xmins are still pinned by commits become
834                    // reachable via AS OF COMMIT immediately.
835                    self.vcs_set_versioned(&query.name, *on)?;
836                    messages.push(format!(
837                        "versioned {} on '{}'",
838                        if *on { "enabled" } else { "disabled" },
839                        query.name
840                    ));
841                }
842                AlterOperation::EnableEvents(subscription) => {
843                    let mut subscription = subscription.clone();
844                    subscription.source = query.name.clone();
845                    validate_event_subscriptions(
846                        self,
847                        &query.name,
848                        std::slice::from_ref(&subscription),
849                    )?;
850                    ensure_event_target_queue(self, &subscription.target_queue)?;
851                    messages.push(format!(
852                        "events enabled on '{}' to '{}'",
853                        query.name, subscription.target_queue
854                    ));
855                }
856                AlterOperation::DisableEvents => {
857                    messages.push(format!("events disabled on '{}'", query.name));
858                }
859                AlterOperation::AddSubscription { name, descriptor } => {
860                    let mut sub = descriptor.clone();
861                    sub.name = name.clone();
862                    sub.source = query.name.clone();
863                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
864                    ensure_event_target_queue(self, &sub.target_queue)?;
865                    messages.push(format!(
866                        "subscription '{}' added on '{}' to '{}'",
867                        name, query.name, sub.target_queue
868                    ));
869                }
870                AlterOperation::DropSubscription { name } => {
871                    messages.push(format!(
872                        "subscription '{}' dropped on '{}'",
873                        name, query.name
874                    ));
875                }
876            }
877        }
878
879        let mut contract = self
880            .inner
881            .db
882            .collection_contract(&query.name)
883            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
884        apply_alter_operations_to_contract(&mut contract, &query.operations);
885        contract.version = contract.version.saturating_add(1);
886        contract.updated_at_unix_ms = current_unix_ms();
887        self.inner
888            .db
889            .save_collection_contract(contract)
890            .map_err(|err| RedDBError::Internal(err.to_string()))?;
891        // Issue #301 — emit OperatorEvent when column schema changes on a
892        // collection that has active event subscriptions, so operators know
893        // downstream consumers may see a different payload shape.
894        if !fields_added.is_empty() || !fields_removed.is_empty() {
895            let sub_names: Vec<String> = self
896                .inner
897                .db
898                .collection_contract(&query.name)
899                .map(|c| {
900                    c.subscriptions
901                        .iter()
902                        .filter(|s| s.enabled)
903                        .map(|s| s.name.clone())
904                        .collect()
905                })
906                .unwrap_or_default();
907            if !sub_names.is_empty() {
908                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
909                    collection: query.name.clone(),
910                    subscription_names: sub_names.join(", "),
911                    fields_added: fields_added.join(", "),
912                    fields_removed: fields_removed.join(", "),
913                    lsn: self.cdc_current_lsn(),
914                }
915                .emit_global();
916            }
917        }
918
919        self.clear_table_planner_stats(&query.name);
920        self.invalidate_result_cache();
921        // Issue #120 — refresh the schema-vocabulary entries from the
922        // post-ALTER contract. Drop+recreate inside the index keeps
923        // the invalidation guarantee complete (no stale columns from
924        // before an ALTER ... DROP COLUMN).
925        let post_alter_columns: Vec<String> = self
926            .inner
927            .db
928            .collection_contract(&query.name)
929            .map(|contract| {
930                contract
931                    .declared_columns
932                    .iter()
933                    .map(|col| col.name.clone())
934                    .collect()
935            })
936            .unwrap_or_default();
937        self.schema_vocabulary_apply(
938            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
939                collection: query.name.clone(),
940                columns: post_alter_columns,
941                type_tags: Vec::new(),
942                description: None,
943            },
944        );
945
946        let message = if messages.is_empty() {
947            format!("table '{}' altered (no operations)", query.name)
948        } else {
949            format!("table '{}' altered: {}", query.name, messages.join(", "))
950        };
951
952        Ok(RuntimeQueryResult::ok_message(
953            raw_query.to_string(),
954            &message,
955            "alter",
956        ))
957    }
958
959    /// Execute EXPLAIN ALTER FOR CREATE TABLE
960    ///
961    /// Pure read: computes the schema diff between the target table's
962    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
963    /// and returns it as SQL `ALTER TABLE` text (default) or structured
964    /// JSON. Never mutates storage.
965    pub fn execute_explain_alter(
966        &self,
967        raw_query: &str,
968        query: &ExplainAlterQuery,
969    ) -> RedDBResult<RuntimeQueryResult> {
970        // Validate the target CREATE TABLE body so syntactically valid
971        // but semantically broken targets (bad SQL types, duplicate
972        // columns) are caught here rather than inside the diff engine.
973        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
974
975        let current_contract = self.inner.db.collection_contract(&query.target.name);
976
977        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
978            .as_ref()
979            .map(|c| c.declared_columns.clone())
980            .unwrap_or_default();
981
982        let diff = super::schema_diff::compute_column_diff(
983            &query.target.name,
984            &current_columns,
985            &query.target.columns,
986        );
987
988        let rendered = match query.format {
989            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
990            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
991        };
992
993        let format_label = match query.format {
994            ExplainFormat::Sql => "sql",
995            ExplainFormat::Json => "json",
996        };
997
998        let columns = vec![
999            "table".to_string(),
1000            "format".to_string(),
1001            "diff".to_string(),
1002        ];
1003        let row = vec![
1004            ("table".to_string(), Value::text(query.target.name.clone())),
1005            ("format".to_string(), Value::text(format_label.to_string())),
1006            ("diff".to_string(), Value::text(rendered)),
1007        ];
1008
1009        Ok(RuntimeQueryResult::ok_records(
1010            raw_query.to_string(),
1011            columns,
1012            vec![row],
1013            "explain",
1014        ))
1015    }
1016
1017    /// Execute CREATE INDEX
1018    ///
1019    /// Registers a new index on a collection, builds it from existing data,
1020    /// and makes it available to the query executor for O(1) lookups.
1021    pub fn execute_create_index(
1022        &self,
1023        raw_query: &str,
1024        query: &CreateIndexQuery,
1025    ) -> RedDBResult<RuntimeQueryResult> {
1026        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1027        let store = self.inner.db.store();
1028
1029        // Verify the table exists
1030        let manager = store
1031            .get_collection(&query.table)
1032            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1033
1034        let method_kind = match query.method {
1035            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1036            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1037            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1038            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1039        };
1040
1041        // Extract fields from existing entities for indexing. Row
1042        // entities may arrive in either the "named" HashMap layout
1043        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1044        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1045        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1046        // leaves `named == None`). Prior to this commit the columnar
1047        // branch returned an empty field list, so `CREATE INDEX` built
1048        // a zero-entity index over HTTP-inserted data even though the
1049        // data was queryable via `SELECT`.
1050        let entities = manager.query_all(|_| true);
1051        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1052            entities
1053                .iter()
1054                .map(|e| {
1055                    let fields = match &e.data {
1056                        crate::storage::EntityData::Row(row) => {
1057                            if let Some(ref named) = row.named {
1058                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1059                            } else if let Some(ref schema) = row.schema {
1060                                // Columnar layout — pair each column
1061                                // with its positional name from the
1062                                // shared schema Arc.
1063                                schema
1064                                    .iter()
1065                                    .zip(row.columns.iter())
1066                                    .map(|(k, v)| (k.clone(), v.clone()))
1067                                    .collect()
1068                            } else {
1069                                Vec::new()
1070                            }
1071                        }
1072                        crate::storage::EntityData::Node(node) => node
1073                            .properties
1074                            .iter()
1075                            .map(|(k, v)| (k.clone(), v.clone()))
1076                            .collect(),
1077                        _ => Vec::new(),
1078                    };
1079                    (e.id, fields)
1080                })
1081                .collect();
1082
1083        // Build the index
1084        let indexed_count = self
1085            .inner
1086            .index_store
1087            .create_index(
1088                &query.name,
1089                &query.table,
1090                &query.columns,
1091                method_kind,
1092                query.unique,
1093                &entity_fields,
1094            )
1095            .map_err(RedDBError::Internal)?;
1096
1097        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1098            &query.table,
1099            &entity_fields,
1100        );
1101        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1102        self.invalidate_plan_cache();
1103
1104        // Register metadata
1105        self.inner
1106            .index_store
1107            .register(super::index_store::RegisteredIndex {
1108                name: query.name.clone(),
1109                collection: query.table.clone(),
1110                columns: query.columns.clone(),
1111                method: method_kind,
1112                unique: query.unique,
1113            });
1114        // Issue #120 — surface the index name + indexed columns in
1115        // the schema-vocabulary so AskPipeline (#121) can resolve
1116        // "the email index" back to its collection.
1117        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1118            collection: query.table.clone(),
1119            index: query.name.clone(),
1120            columns: query.columns.clone(),
1121        });
1122
1123        let method_str = format!("{}", query.method);
1124        let unique_str = if query.unique { "unique " } else { "" };
1125        let cols = query.columns.join(", ");
1126
1127        Ok(RuntimeQueryResult::ok_message(
1128            raw_query.to_string(),
1129            &format!(
1130                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1131                unique_str, query.name, query.table, cols, method_str, indexed_count
1132            ),
1133            "create",
1134        ))
1135    }
1136
1137    /// Execute DROP INDEX
1138    ///
1139    /// Removes an index from a collection.
1140    pub fn execute_drop_index(
1141        &self,
1142        raw_query: &str,
1143        query: &DropIndexQuery,
1144    ) -> RedDBResult<RuntimeQueryResult> {
1145        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1146        let store = self.inner.db.store();
1147
1148        // Verify the table exists
1149        if store.get_collection(&query.table).is_none() {
1150            if query.if_exists {
1151                return Ok(RuntimeQueryResult::ok_message(
1152                    raw_query.to_string(),
1153                    &format!("table '{}' does not exist", query.table),
1154                    "drop",
1155                ));
1156            }
1157            return Err(RedDBError::NotFound(format!(
1158                "table '{}' not found",
1159                query.table
1160            )));
1161        }
1162
1163        // Remove from IndexStore
1164        self.inner.index_store.drop_index(&query.name, &query.table);
1165        self.invalidate_plan_cache();
1166        // Issue #120 — keep the schema-vocabulary index entry in sync.
1167        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1168            collection: query.table.clone(),
1169            index: query.name.clone(),
1170        });
1171
1172        Ok(RuntimeQueryResult::ok_message(
1173            raw_query.to_string(),
1174            &format!("index '{}' dropped from '{}'", query.name, query.table),
1175            "drop",
1176        ))
1177    }
1178
1179    fn execute_drop_typed_collection(
1180        &self,
1181        raw_query: &str,
1182        name: &str,
1183        if_exists: bool,
1184        expected_model: CollectionModel,
1185        label: &str,
1186    ) -> RedDBResult<RuntimeQueryResult> {
1187        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1188        if is_system_schema_name(name) {
1189            return Err(RedDBError::Query("system schema is read-only".to_string()));
1190        }
1191        let store = self.inner.db.store();
1192        if store.get_collection(name).is_none() {
1193            if if_exists {
1194                return Ok(RuntimeQueryResult::ok_message(
1195                    raw_query.to_string(),
1196                    &format!("{label} '{name}' does not exist"),
1197                    "drop",
1198                ));
1199            }
1200            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1201        }
1202
1203        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1204        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1205        self.drop_collection_storage(raw_query, name, label)
1206    }
1207
1208    pub fn execute_truncate(
1209        &self,
1210        raw_query: &str,
1211        query: &TruncateQuery,
1212    ) -> RedDBResult<RuntimeQueryResult> {
1213        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1214        if is_system_schema_name(&query.name) {
1215            return Err(RedDBError::Query("system schema is read-only".to_string()));
1216        }
1217
1218        let label = query
1219            .model
1220            .map(polymorphic_resolver::model_name)
1221            .unwrap_or("collection");
1222        let store = self.inner.db.store();
1223        if store.get_collection(&query.name).is_none() {
1224            if query.if_exists {
1225                return Ok(RuntimeQueryResult::ok_message(
1226                    raw_query.to_string(),
1227                    &format!("{label} '{}' does not exist", query.name),
1228                    "truncate",
1229                ));
1230            }
1231            return Err(RedDBError::NotFound(format!(
1232                "{label} '{}' not found",
1233                query.name
1234            )));
1235        }
1236
1237        let actual =
1238            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1239        if let Some(expected) = query.model {
1240            polymorphic_resolver::ensure_model_match(expected, actual)?;
1241        }
1242
1243        if actual == CollectionModel::Queue {
1244            return self.execute_queue_command(
1245                raw_query,
1246                &QueueCommand::Purge {
1247                    queue: query.name.clone(),
1248                },
1249            );
1250        }
1251
1252        // Count before wiping so we can emit the aggregated truncate event.
1253        let affected = self.truncate_collection_entities(&query.name)?;
1254        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1255        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1256        self.inner.db.invalidate_vector_index(&query.name);
1257        self.clear_table_planner_stats(&query.name);
1258        self.invalidate_result_cache();
1259
1260        Ok(RuntimeQueryResult::ok_message(
1261            raw_query.to_string(),
1262            &format!(
1263                "{affected} entities truncated from {label} '{}'",
1264                query.name
1265            ),
1266            "truncate",
1267        ))
1268    }
1269
1270    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1271        let store = self.inner.db.store();
1272        let Some(manager) = store.get_collection(name) else {
1273            return Ok(0);
1274        };
1275        let entities = manager.query_all(|_| true);
1276        if entities.is_empty() {
1277            return Ok(0);
1278        }
1279
1280        for entity in &entities {
1281            let fields = entity_index_fields(&entity.data);
1282            self.inner
1283                .index_store
1284                .index_entity_delete(name, entity.id, &fields)
1285                .map_err(RedDBError::Internal)?;
1286        }
1287
1288        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1289        let deleted_ids = store
1290            .delete_batch(name, &ids)
1291            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1292        for id in &deleted_ids {
1293            store.context_index().remove_entity(*id);
1294        }
1295        Ok(deleted_ids.len() as u64)
1296    }
1297
1298    fn drop_collection_storage(
1299        &self,
1300        raw_query: &str,
1301        name: &str,
1302        label: &str,
1303    ) -> RedDBResult<RuntimeQueryResult> {
1304        let store = self.inner.db.store();
1305
1306        // Emit 1 collection_dropped event before storage is wiped.
1307        // Queue is preserved; subscription is removed with the contract below.
1308        let final_count = store
1309            .get_collection(name)
1310            .map(|manager| manager.query_all(|_| true).len() as u64)
1311            .unwrap_or(0);
1312        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1313            self,
1314            name,
1315            final_count,
1316        )?;
1317
1318        let orphaned_indices: Vec<String> = self
1319            .inner
1320            .index_store
1321            .list_indices(name)
1322            .into_iter()
1323            .map(|index| index.name)
1324            .collect();
1325        for index_name in &orphaned_indices {
1326            self.inner.index_store.drop_index(index_name, name);
1327        }
1328
1329        store
1330            .drop_collection(name)
1331            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1332        self.inner.db.invalidate_vector_index(name);
1333        self.inner.db.clear_collection_default_ttl_ms(name);
1334        self.inner
1335            .db
1336            .remove_collection_contract(name)
1337            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1338        self.clear_table_planner_stats(name);
1339        self.invalidate_result_cache();
1340        if let Some(store) = self.inner.auth_store.read().clone() {
1341            store.invalidate_visible_collections_cache();
1342        }
1343        self.inner
1344            .db
1345            .persist_metadata()
1346            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1347        self.schema_vocabulary_apply(
1348            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1349                collection: name.to_string(),
1350            },
1351        );
1352
1353        Ok(RuntimeQueryResult::ok_message(
1354            raw_query.to_string(),
1355            &format!("{label} '{name}' dropped"),
1356            "drop",
1357        ))
1358    }
1359}
1360
1361pub(crate) fn is_system_schema_name(name: &str) -> bool {
1362    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1363}
1364
1365fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1366    match data {
1367        EntityData::Row(row) => {
1368            if let Some(ref named) = row.named {
1369                named
1370                    .iter()
1371                    .map(|(key, value)| (key.clone(), value.clone()))
1372                    .collect()
1373            } else if let Some(ref schema) = row.schema {
1374                schema
1375                    .iter()
1376                    .zip(row.columns.iter())
1377                    .map(|(key, value)| (key.clone(), value.clone()))
1378                    .collect()
1379            } else {
1380                Vec::new()
1381            }
1382        }
1383        EntityData::Node(node) => node
1384            .properties
1385            .iter()
1386            .map(|(key, value)| (key.clone(), value.clone()))
1387            .collect(),
1388        _ => Vec::new(),
1389    }
1390}
1391
1392fn collection_contract_from_create_table(
1393    query: &CreateTableQuery,
1394) -> RedDBResult<crate::physical::CollectionContract> {
1395    let now = current_unix_ms();
1396    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1397        .columns
1398        .iter()
1399        .map(declared_column_contract_from_ddl)
1400        .collect();
1401    if query.timestamps {
1402        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1403        // columns that the write path populates from
1404        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1405        declared_columns.push(crate::physical::DeclaredColumnContract {
1406            name: "created_at".to_string(),
1407            data_type: "BIGINT".to_string(),
1408            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1409            not_null: true,
1410            default: None,
1411            compress: None,
1412            unique: false,
1413            primary_key: false,
1414            enum_variants: Vec::new(),
1415            array_element: None,
1416            decimal_precision: None,
1417        });
1418        declared_columns.push(crate::physical::DeclaredColumnContract {
1419            name: "updated_at".to_string(),
1420            data_type: "BIGINT".to_string(),
1421            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1422            not_null: true,
1423            default: None,
1424            compress: None,
1425            unique: false,
1426            primary_key: false,
1427            enum_variants: Vec::new(),
1428            array_element: None,
1429            decimal_precision: None,
1430        });
1431    }
1432    Ok(crate::physical::CollectionContract {
1433        name: query.name.clone(),
1434        declared_model: crate::catalog::CollectionModel::Table,
1435        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1436        origin: crate::physical::ContractOrigin::Explicit,
1437        version: 1,
1438        created_at_unix_ms: now,
1439        updated_at_unix_ms: now,
1440        default_ttl_ms: query.default_ttl_ms,
1441        vector_dimension: None,
1442        vector_metric: None,
1443        context_index_fields: query.context_index_fields.clone(),
1444        declared_columns,
1445        table_def: Some(build_table_def_from_create_table(query)?),
1446        timestamps_enabled: query.timestamps,
1447        context_index_enabled: query.context_index_enabled
1448            || !query.context_index_fields.is_empty(),
1449        metrics_raw_retention_ms: None,
1450        metrics_rollup_policies: Vec::new(),
1451        metrics_tenant_identity: None,
1452        metrics_namespace: None,
1453        append_only: query.append_only,
1454        subscriptions: query.subscriptions.clone(),
1455    })
1456}
1457
1458fn default_collection_contract_for_existing_table(
1459    name: &str,
1460) -> crate::physical::CollectionContract {
1461    let now = current_unix_ms();
1462    crate::physical::CollectionContract {
1463        name: name.to_string(),
1464        declared_model: crate::catalog::CollectionModel::Table,
1465        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1466        origin: crate::physical::ContractOrigin::Explicit,
1467        version: 0,
1468        created_at_unix_ms: now,
1469        updated_at_unix_ms: now,
1470        default_ttl_ms: None,
1471        vector_dimension: None,
1472        vector_metric: None,
1473        context_index_fields: Vec::new(),
1474        declared_columns: Vec::new(),
1475        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1476        timestamps_enabled: false,
1477        context_index_enabled: false,
1478        metrics_raw_retention_ms: None,
1479        metrics_rollup_policies: Vec::new(),
1480        metrics_tenant_identity: None,
1481        metrics_namespace: None,
1482        append_only: false,
1483        subscriptions: Vec::new(),
1484    }
1485}
1486
1487fn keyed_collection_contract(
1488    name: &str,
1489    model: crate::catalog::CollectionModel,
1490) -> crate::physical::CollectionContract {
1491    let now = current_unix_ms();
1492    crate::physical::CollectionContract {
1493        name: name.to_string(),
1494        declared_model: model,
1495        schema_mode: crate::catalog::SchemaMode::Dynamic,
1496        origin: crate::physical::ContractOrigin::Explicit,
1497        version: 1,
1498        created_at_unix_ms: now,
1499        updated_at_unix_ms: now,
1500        default_ttl_ms: None,
1501        vector_dimension: None,
1502        vector_metric: None,
1503        context_index_fields: Vec::new(),
1504        declared_columns: Vec::new(),
1505        table_def: None,
1506        timestamps_enabled: false,
1507        context_index_enabled: false,
1508        metrics_raw_retention_ms: None,
1509        metrics_rollup_policies: Vec::new(),
1510        metrics_tenant_identity: None,
1511        metrics_namespace: None,
1512        append_only: false,
1513        subscriptions: Vec::new(),
1514    }
1515}
1516
1517fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1518    let now = current_unix_ms();
1519    crate::physical::CollectionContract {
1520        name: query.name.clone(),
1521        declared_model: crate::catalog::CollectionModel::Metrics,
1522        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1523        origin: crate::physical::ContractOrigin::Explicit,
1524        version: 1,
1525        created_at_unix_ms: now,
1526        updated_at_unix_ms: now,
1527        default_ttl_ms: query.default_ttl_ms,
1528        vector_dimension: None,
1529        vector_metric: None,
1530        context_index_fields: Vec::new(),
1531        declared_columns: Vec::new(),
1532        table_def: None,
1533        timestamps_enabled: false,
1534        context_index_enabled: false,
1535        metrics_raw_retention_ms: query.default_ttl_ms,
1536        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1537        metrics_tenant_identity: Some(
1538            query
1539                .tenant_by
1540                .clone()
1541                .unwrap_or_else(|| "current_tenant".to_string()),
1542        ),
1543        metrics_namespace: Some("default".to_string()),
1544        append_only: true,
1545        subscriptions: Vec::new(),
1546    }
1547}
1548
1549fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1550    let now = current_unix_ms();
1551    crate::physical::CollectionContract {
1552        name: query.name.clone(),
1553        declared_model: crate::catalog::CollectionModel::Vector,
1554        schema_mode: crate::catalog::SchemaMode::Dynamic,
1555        origin: crate::physical::ContractOrigin::Explicit,
1556        version: 1,
1557        created_at_unix_ms: now,
1558        updated_at_unix_ms: now,
1559        default_ttl_ms: None,
1560        vector_dimension: Some(query.dimension),
1561        vector_metric: Some(query.metric),
1562        context_index_fields: Vec::new(),
1563        declared_columns: Vec::new(),
1564        table_def: None,
1565        timestamps_enabled: false,
1566        context_index_enabled: false,
1567        metrics_raw_retention_ms: None,
1568        metrics_rollup_policies: Vec::new(),
1569        metrics_tenant_identity: None,
1570        metrics_namespace: None,
1571        append_only: false,
1572        subscriptions: Vec::new(),
1573    }
1574}
1575
1576fn declared_column_contract_from_ddl(
1577    column: &CreateColumnDef,
1578) -> crate::physical::DeclaredColumnContract {
1579    crate::physical::DeclaredColumnContract {
1580        name: column.name.clone(),
1581        data_type: column.data_type.clone(),
1582        sql_type: Some(column.sql_type.clone()),
1583        not_null: column.not_null,
1584        default: column.default.clone(),
1585        compress: column.compress,
1586        unique: column.unique,
1587        primary_key: column.primary_key,
1588        enum_variants: column.enum_variants.clone(),
1589        array_element: column.array_element.clone(),
1590        decimal_precision: column.decimal_precision,
1591    }
1592}
1593
1594fn apply_alter_operations_to_contract(
1595    contract: &mut crate::physical::CollectionContract,
1596    operations: &[AlterOperation],
1597) {
1598    if contract.table_def.is_none() {
1599        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1600    }
1601    for operation in operations {
1602        match operation {
1603            AlterOperation::AddColumn(column) => {
1604                if !contract
1605                    .declared_columns
1606                    .iter()
1607                    .any(|existing| existing.name == column.name)
1608                {
1609                    contract
1610                        .declared_columns
1611                        .push(declared_column_contract_from_ddl(column));
1612                }
1613                if let Some(table_def) = contract.table_def.as_mut() {
1614                    if table_def.get_column(&column.name).is_none() {
1615                        if let Ok(column_def) = column_def_from_ddl(column) {
1616                            if column.primary_key {
1617                                table_def.primary_key.push(column.name.clone());
1618                                table_def.constraints.push(
1619                                    crate::storage::schema::Constraint::new(
1620                                        format!("pk_{}", column.name),
1621                                        crate::storage::schema::ConstraintType::PrimaryKey,
1622                                    )
1623                                    .on_columns(vec![column.name.clone()]),
1624                                );
1625                            }
1626                            if column.unique {
1627                                table_def.constraints.push(
1628                                    crate::storage::schema::Constraint::new(
1629                                        format!("uniq_{}", column.name),
1630                                        crate::storage::schema::ConstraintType::Unique,
1631                                    )
1632                                    .on_columns(vec![column.name.clone()]),
1633                                );
1634                            }
1635                            if column.not_null {
1636                                table_def.constraints.push(
1637                                    crate::storage::schema::Constraint::new(
1638                                        format!("not_null_{}", column.name),
1639                                        crate::storage::schema::ConstraintType::NotNull,
1640                                    )
1641                                    .on_columns(vec![column.name.clone()]),
1642                                );
1643                            }
1644                            table_def.columns.push(column_def);
1645                        }
1646                    }
1647                }
1648            }
1649            AlterOperation::DropColumn(name) => {
1650                contract
1651                    .declared_columns
1652                    .retain(|column| column.name != *name);
1653                if let Some(table_def) = contract.table_def.as_mut() {
1654                    if let Some(index) = table_def.column_index(name) {
1655                        table_def.columns.remove(index);
1656                    }
1657                    table_def.primary_key.retain(|column| column != name);
1658                    table_def.constraints.retain(|constraint| {
1659                        !constraint.columns.iter().any(|column| column == name)
1660                    });
1661                    table_def
1662                        .indexes
1663                        .retain(|index| !index.columns.iter().any(|column| column == name));
1664                }
1665            }
1666            AlterOperation::RenameColumn { from, to } => {
1667                if contract
1668                    .declared_columns
1669                    .iter()
1670                    .any(|column| column.name == *to)
1671                {
1672                    continue;
1673                }
1674                if let Some(column) = contract
1675                    .declared_columns
1676                    .iter_mut()
1677                    .find(|column| column.name == *from)
1678                {
1679                    column.name = to.clone();
1680                }
1681                if let Some(table_def) = contract.table_def.as_mut() {
1682                    if let Some(column) = table_def
1683                        .columns
1684                        .iter_mut()
1685                        .find(|column| column.name == *from)
1686                    {
1687                        column.name = to.clone();
1688                    }
1689                    for primary_key in &mut table_def.primary_key {
1690                        if *primary_key == *from {
1691                            *primary_key = to.clone();
1692                        }
1693                    }
1694                    for constraint in &mut table_def.constraints {
1695                        for column in &mut constraint.columns {
1696                            if *column == *from {
1697                                *column = to.clone();
1698                            }
1699                        }
1700                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1701                            for column in ref_columns {
1702                                if *column == *from {
1703                                    *column = to.clone();
1704                                }
1705                            }
1706                        }
1707                    }
1708                    for index in &mut table_def.indexes {
1709                        for column in &mut index.columns {
1710                            if *column == *from {
1711                                *column = to.clone();
1712                            }
1713                        }
1714                    }
1715                }
1716            }
1717            // Partition ops don't touch the column contract — metadata is
1718            // persisted separately via `red_config.partition.*`.
1719            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1720            // RLS toggles don't touch the column contract — flag is persisted
1721            // separately via `red_config.rls.enabled.{table}` and enforced
1722            // through the in-memory `rls_enabled_tables` set.
1723            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1724            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1725            // and are enforced through `tenant_tables` + RLS auto-policy.
1726            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1727            AlterOperation::SetAppendOnly(on) => {
1728                contract.append_only = *on;
1729            }
1730            // VCS opt-in is persisted to red_vcs_settings by the
1731            // executor, not the contract — nothing to do here.
1732            AlterOperation::SetVersioned(_) => {}
1733            AlterOperation::EnableEvents(subscription) => {
1734                let mut subscription = subscription.clone();
1735                subscription.source = contract.name.clone();
1736                subscription.enabled = true;
1737                if let Some(existing) = contract
1738                    .subscriptions
1739                    .iter_mut()
1740                    .find(|existing| existing.target_queue == subscription.target_queue)
1741                {
1742                    *existing = subscription;
1743                } else {
1744                    contract.subscriptions.push(subscription);
1745                }
1746            }
1747            AlterOperation::DisableEvents => {
1748                for subscription in &mut contract.subscriptions {
1749                    subscription.enabled = false;
1750                }
1751            }
1752            AlterOperation::AddSubscription { name, descriptor } => {
1753                let mut sub = descriptor.clone();
1754                sub.name = name.clone();
1755                sub.source = contract.name.clone();
1756                sub.enabled = true;
1757                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1758                {
1759                    *existing = sub;
1760                } else {
1761                    contract.subscriptions.push(sub);
1762                }
1763            }
1764            AlterOperation::DropSubscription { name } => {
1765                contract.subscriptions.retain(|s| s.name != *name);
1766            }
1767        }
1768    }
1769}
1770
1771fn validate_event_subscriptions(
1772    runtime: &RedDBRuntime,
1773    source: &str,
1774    subscriptions: &[crate::catalog::SubscriptionDescriptor],
1775) -> RedDBResult<()> {
1776    for subscription in subscriptions
1777        .iter()
1778        .filter(|subscription| subscription.enabled)
1779    {
1780        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1781            return Err(RedDBError::Query(
1782                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1783            ));
1784        }
1785        validate_subscription_auth(runtime, source, subscription)?;
1786        if subscription.target_queue == source
1787            || subscription_would_create_cycle(
1788                &runtime.inner.db,
1789                source,
1790                &subscription.target_queue,
1791            )
1792        {
1793            return Err(RedDBError::Query(
1794                "subscription would create cycle".to_string(),
1795            ));
1796        }
1797        audit_subscription_redact_gap(runtime, source, subscription);
1798    }
1799    Ok(())
1800}
1801
1802fn validate_subscription_auth(
1803    runtime: &RedDBRuntime,
1804    source: &str,
1805    subscription: &crate::catalog::SubscriptionDescriptor,
1806) -> RedDBResult<()> {
1807    let auth_store = match runtime.inner.auth_store.read().clone() {
1808        Some(store) => store,
1809        None => return Ok(()),
1810    };
1811    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1812        Some(identity) => identity,
1813        None => return Ok(()),
1814    };
1815    let tenant = crate::runtime::impl_core::current_tenant();
1816    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1817
1818    if auth_store.iam_authorization_enabled() {
1819        let ctx = crate::auth::policies::EvalContext {
1820            principal_tenant: tenant.clone(),
1821            current_tenant: tenant.clone(),
1822            peer_ip: None,
1823            mfa_present: false,
1824            now_ms: crate::auth::now_ms(),
1825            principal_is_admin_role: role == crate::auth::Role::Admin,
1826        };
1827        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1828        if let Some(t) = tenant.as_deref() {
1829            source_resource = source_resource.with_tenant(t.to_string());
1830        }
1831        if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1832            return Err(RedDBError::Query(format!(
1833                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1834                principal, source_resource.kind, source_resource.name
1835            )));
1836        }
1837
1838        let mut target_resource =
1839            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1840        if let Some(t) = tenant.as_deref() {
1841            target_resource = target_resource.with_tenant(t.to_string());
1842        }
1843        if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1844            return Err(RedDBError::Query(format!(
1845                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1846                principal, target_resource.kind, target_resource.name
1847            )));
1848        }
1849        return Ok(());
1850    }
1851
1852    let ctx = crate::auth::privileges::AuthzContext {
1853        principal: &username,
1854        effective_role: role,
1855        tenant: tenant.as_deref(),
1856    };
1857    auth_store
1858        .check_grant(
1859            &ctx,
1860            crate::auth::privileges::Action::Select,
1861            &crate::auth::privileges::Resource::table_from_name(source),
1862        )
1863        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1864    auth_store
1865        .check_grant(
1866            &ctx,
1867            crate::auth::privileges::Action::Insert,
1868            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1869        )
1870        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1871    Ok(())
1872}
1873
1874fn audit_subscription_redact_gap(
1875    runtime: &RedDBRuntime,
1876    source: &str,
1877    subscription: &crate::catalog::SubscriptionDescriptor,
1878) {
1879    let auth_store = match runtime.inner.auth_store.read().clone() {
1880        Some(store) if store.iam_authorization_enabled() => store,
1881        _ => return,
1882    };
1883    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1884        Some(identity) => identity,
1885        None => return,
1886    };
1887    let tenant = crate::runtime::impl_core::current_tenant();
1888    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1889    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1890    if missing.is_empty() {
1891        return;
1892    }
1893
1894    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1895    tracing::warn!(
1896        target: "reddb::operator",
1897        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1898        source,
1899        subscription.target_queue,
1900        columns
1901    );
1902    let mut event = AuditEvent::builder("subscription_redact_gap")
1903        .principal(username)
1904        .source(AuditAuthSource::System)
1905        .resource(format!(
1906            "subscription:{}->{}",
1907            source, subscription.target_queue
1908        ))
1909        .outcome(Outcome::Success)
1910        .field(AuditFieldEscaper::field("source", source))
1911        .field(AuditFieldEscaper::field(
1912            "target_queue",
1913            subscription.target_queue.clone(),
1914        ))
1915        .field(AuditFieldEscaper::field(
1916            "subscription",
1917            subscription.name.clone(),
1918        ))
1919        .field(AuditFieldEscaper::field("columns", columns))
1920        .field(AuditFieldEscaper::field("role", role.as_str()));
1921    if let Some(t) = tenant {
1922        event = event.tenant(t);
1923    }
1924    runtime.inner.audit_log.record_event(event.build());
1925}
1926
1927fn subscription_redact_gap_columns(
1928    auth_store: &crate::auth::store::AuthStore,
1929    principal: &crate::auth::UserId,
1930    source: &str,
1931    subscription: &crate::catalog::SubscriptionDescriptor,
1932) -> BTreeSet<String> {
1933    let redacted: HashSet<String> = subscription
1934        .redact_fields
1935        .iter()
1936        .map(|field| field.to_ascii_lowercase())
1937        .collect();
1938    auth_store
1939        .effective_policies(principal)
1940        .iter()
1941        .flat_map(|policy| policy.statements.iter())
1942        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1943        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1944        .flat_map(|statement| statement.resources.iter())
1945        .filter_map(|resource| denied_column_for_source(resource, source))
1946        .filter(|column| !redact_covers_column(&redacted, source, column))
1947        .collect()
1948}
1949
1950fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1951    match pattern {
1952        crate::auth::policies::ActionPattern::Wildcard => true,
1953        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1954        crate::auth::policies::ActionPattern::Prefix(prefix) => {
1955            "select".len() > prefix.len() + 1
1956                && "select".starts_with(prefix)
1957                && "select".as_bytes()[prefix.len()] == b':'
1958        }
1959    }
1960}
1961
1962fn denied_column_for_source(
1963    resource: &crate::auth::policies::ResourcePattern,
1964    source: &str,
1965) -> Option<String> {
1966    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1967        return None;
1968    };
1969    if kind != "column" {
1970        return None;
1971    }
1972    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1973    (column.table_resource_name() == source).then_some(column.column)
1974}
1975
1976fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1977    let column = column.to_ascii_lowercase();
1978    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1979    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1980}
1981
1982fn subscription_would_create_cycle(
1983    db: &crate::storage::unified::devx::RedDB,
1984    source: &str,
1985    target: &str,
1986) -> bool {
1987    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1988    for contract in db.collection_contracts() {
1989        for subscription in contract
1990            .subscriptions
1991            .into_iter()
1992            .filter(|subscription| subscription.enabled)
1993        {
1994            graph
1995                .entry(subscription.source)
1996                .or_default()
1997                .push(subscription.target_queue);
1998        }
1999    }
2000    graph
2001        .entry(source.to_string())
2002        .or_default()
2003        .push(target.to_string());
2004
2005    let mut stack = vec![target.to_string()];
2006    let mut seen = HashSet::new();
2007    while let Some(node) = stack.pop() {
2008        if node == source {
2009            return true;
2010        }
2011        if !seen.insert(node.clone()) {
2012            continue;
2013        }
2014        if let Some(next) = graph.get(&node) {
2015            stack.extend(next.iter().cloned());
2016        }
2017    }
2018    false
2019}
2020
2021pub(crate) fn ensure_event_target_queue_pub(
2022    runtime: &RedDBRuntime,
2023    queue: &str,
2024) -> RedDBResult<()> {
2025    ensure_event_target_queue(runtime, queue)
2026}
2027
2028fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2029    let store = runtime.inner.db.store();
2030    if store.get_collection(queue).is_some() {
2031        return Ok(());
2032    }
2033    store
2034        .create_collection(queue)
2035        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2036    runtime
2037        .inner
2038        .db
2039        .save_collection_contract(event_queue_collection_contract(queue))
2040        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2041    store.set_config_tree(
2042        &format!("queue.{queue}.mode"),
2043        &crate::serde_json::Value::String("fanout".to_string()),
2044    );
2045    Ok(())
2046}
2047
2048fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2049    let now = current_unix_ms();
2050    crate::physical::CollectionContract {
2051        name: queue.to_string(),
2052        declared_model: crate::catalog::CollectionModel::Queue,
2053        schema_mode: crate::catalog::SchemaMode::Dynamic,
2054        origin: crate::physical::ContractOrigin::Implicit,
2055        version: 1,
2056        created_at_unix_ms: now,
2057        updated_at_unix_ms: now,
2058        default_ttl_ms: None,
2059        vector_dimension: None,
2060        vector_metric: None,
2061        context_index_fields: Vec::new(),
2062        declared_columns: Vec::new(),
2063        table_def: None,
2064        timestamps_enabled: false,
2065        context_index_enabled: false,
2066        metrics_raw_retention_ms: None,
2067        metrics_rollup_policies: Vec::new(),
2068        metrics_tenant_identity: None,
2069        metrics_namespace: None,
2070        append_only: true,
2071        subscriptions: Vec::new(),
2072    }
2073}
2074
2075fn build_table_def_from_create_table(
2076    query: &CreateTableQuery,
2077) -> RedDBResult<crate::storage::schema::TableDef> {
2078    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2079    for column in &query.columns {
2080        if column.primary_key {
2081            table.primary_key.push(column.name.clone());
2082            table.constraints.push(
2083                crate::storage::schema::Constraint::new(
2084                    format!("pk_{}", column.name),
2085                    crate::storage::schema::ConstraintType::PrimaryKey,
2086                )
2087                .on_columns(vec![column.name.clone()]),
2088            );
2089        }
2090        if column.unique {
2091            table.constraints.push(
2092                crate::storage::schema::Constraint::new(
2093                    format!("uniq_{}", column.name),
2094                    crate::storage::schema::ConstraintType::Unique,
2095                )
2096                .on_columns(vec![column.name.clone()]),
2097            );
2098        }
2099        if column.not_null {
2100            table.constraints.push(
2101                crate::storage::schema::Constraint::new(
2102                    format!("not_null_{}", column.name),
2103                    crate::storage::schema::ConstraintType::NotNull,
2104                )
2105                .on_columns(vec![column.name.clone()]),
2106            );
2107        }
2108        table.columns.push(column_def_from_ddl(column)?);
2109    }
2110    // WITH timestamps = true: append the two runtime-managed columns
2111    // to the schema so resolved_contract_columns exposes them to the
2112    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2113    // not-nullable; the write path auto-fills them.
2114    if query.timestamps {
2115        table.columns.push(
2116            crate::storage::schema::ColumnDef::new(
2117                "created_at".to_string(),
2118                crate::storage::schema::DataType::UnsignedInteger,
2119            )
2120            .not_null(),
2121        );
2122        table.columns.push(
2123            crate::storage::schema::ColumnDef::new(
2124                "updated_at".to_string(),
2125                crate::storage::schema::DataType::UnsignedInteger,
2126            )
2127            .not_null(),
2128        );
2129        table.constraints.push(
2130            crate::storage::schema::Constraint::new(
2131                "not_null_created_at".to_string(),
2132                crate::storage::schema::ConstraintType::NotNull,
2133            )
2134            .on_columns(vec!["created_at".to_string()]),
2135        );
2136        table.constraints.push(
2137            crate::storage::schema::Constraint::new(
2138                "not_null_updated_at".to_string(),
2139                crate::storage::schema::ConstraintType::NotNull,
2140            )
2141            .on_columns(vec!["updated_at".to_string()]),
2142        );
2143    }
2144    table
2145        .validate()
2146        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2147    Ok(table)
2148}
2149
2150fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2151    let data_type = resolve_declared_data_type(&column.data_type)
2152        .map_err(|err| RedDBError::Query(err.to_string()))?;
2153    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2154    if column.not_null {
2155        column_def = column_def.not_null();
2156    }
2157    if let Some(default) = &column.default {
2158        column_def = column_def.with_default(default.as_bytes().to_vec());
2159    }
2160    if column.compress.unwrap_or(0) > 0 {
2161        column_def = column_def.compressed();
2162    }
2163    if !column.enum_variants.is_empty() {
2164        column_def = column_def.with_variants(column.enum_variants.clone());
2165    }
2166    if let Some(precision) = column.decimal_precision {
2167        column_def = column_def.with_precision(precision);
2168    }
2169    if let Some(element_type) = &column.array_element {
2170        column_def = column_def.with_element_type(
2171            resolve_declared_data_type(element_type)
2172                .map_err(|err| RedDBError::Query(err.to_string()))?,
2173        );
2174    }
2175    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2176    if column.unique {
2177        column_def = column_def.with_metadata("unique", "true");
2178    }
2179    if column.primary_key {
2180        column_def = column_def.with_metadata("primary_key", "true");
2181    }
2182    Ok(column_def)
2183}
2184
2185fn current_unix_ms() -> u128 {
2186    std::time::SystemTime::now()
2187        .duration_since(std::time::UNIX_EPOCH)
2188        .unwrap_or_default()
2189        .as_millis()
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2195    use crate::auth::store::{AuthStore, PrincipalRef};
2196    use crate::auth::UserId;
2197    use crate::auth::{AuthConfig, Role};
2198    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2199    use crate::storage::schema::Value;
2200    use crate::{RedDBOptions, RedDBRuntime};
2201    use std::sync::Arc;
2202
2203    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2204        Policy {
2205            id: id.to_string(),
2206            version: 1,
2207            tenant: None,
2208            created_at: 0,
2209            updated_at: 0,
2210            statements: vec![Statement {
2211                sid: None,
2212                effect: Effect::Allow,
2213                actions: vec![ActionPattern::Exact(action.to_string())],
2214                resources: vec![ResourcePattern::Exact {
2215                    kind: "collection".to_string(),
2216                    name: collection.to_string(),
2217                }],
2218                condition: None,
2219            }],
2220        }
2221    }
2222
2223    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2224        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2225        *rt.inner.auth_store.write() = Some(store.clone());
2226        store
2227    }
2228
2229    #[test]
2230    fn drop_denied_without_iam_policy() {
2231        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2232        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2233        let store = wire_auth_store(&rt);
2234        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2235        let select_only = Policy {
2236            id: "select-only".to_string(),
2237            version: 1,
2238            tenant: None,
2239            created_at: 0,
2240            updated_at: 0,
2241            statements: vec![Statement {
2242                sid: None,
2243                effect: Effect::Allow,
2244                actions: vec![ActionPattern::Exact("select".to_string())],
2245                resources: vec![ResourcePattern::Wildcard],
2246                condition: None,
2247            }],
2248        };
2249        store.put_policy_internal(select_only).unwrap();
2250        let alice = UserId::from_parts(None, "alice");
2251        store
2252            .attach_policy(PrincipalRef::User(alice), "select-only")
2253            .unwrap();
2254        set_current_auth_identity("alice".to_string(), Role::Write);
2255        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2256        clear_current_auth_identity();
2257        assert!(
2258            format!("{err}").contains("denied by IAM policy"),
2259            "got: {err}"
2260        );
2261    }
2262
2263    #[test]
2264    fn drop_allowed_with_explicit_iam_policy() {
2265        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2266        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2267        let store = wire_auth_store(&rt);
2268        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2269        store.put_policy_internal(policy).unwrap();
2270        let bob = UserId::from_parts(None, "bob");
2271        store
2272            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2273            .unwrap();
2274        set_current_auth_identity("bob".to_string(), Role::Write);
2275        rt.execute_query("DROP TABLE bar").unwrap();
2276        clear_current_auth_identity();
2277    }
2278
2279    #[test]
2280    fn drop_allowed_with_wildcard_iam_policy() {
2281        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2282        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2283        let store = wire_auth_store(&rt);
2284        let policy = Policy {
2285            id: "allow-drop-all".to_string(),
2286            version: 1,
2287            tenant: None,
2288            created_at: 0,
2289            updated_at: 0,
2290            statements: vec![Statement {
2291                sid: None,
2292                effect: Effect::Allow,
2293                actions: vec![ActionPattern::Exact("drop".to_string())],
2294                resources: vec![ResourcePattern::Wildcard],
2295                condition: None,
2296            }],
2297        };
2298        store.put_policy_internal(policy).unwrap();
2299        let carl = UserId::from_parts(None, "carl");
2300        store
2301            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2302            .unwrap();
2303        set_current_auth_identity("carl".to_string(), Role::Write);
2304        rt.execute_query("DROP TABLE baz").unwrap();
2305        clear_current_auth_identity();
2306    }
2307
2308    #[test]
2309    fn truncate_denied_without_iam_policy() {
2310        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2311        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2312        let store = wire_auth_store(&rt);
2313        // A policy exists (IAM active) but gives no truncate right.
2314        let select_only = Policy {
2315            id: "select-only-2".to_string(),
2316            version: 1,
2317            tenant: None,
2318            created_at: 0,
2319            updated_at: 0,
2320            statements: vec![Statement {
2321                sid: None,
2322                effect: Effect::Allow,
2323                actions: vec![ActionPattern::Exact("select".to_string())],
2324                resources: vec![ResourcePattern::Wildcard],
2325                condition: None,
2326            }],
2327        };
2328        store.put_policy_internal(select_only).unwrap();
2329        let dana = UserId::from_parts(None, "dana");
2330        store
2331            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2332            .unwrap();
2333        set_current_auth_identity("dana".to_string(), Role::Write);
2334        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2335        clear_current_auth_identity();
2336        assert!(
2337            format!("{err}").contains("denied by IAM policy"),
2338            "got: {err}"
2339        );
2340    }
2341
2342    #[test]
2343    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2344        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2345        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2346            .unwrap();
2347        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2348            .unwrap();
2349        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2350            .unwrap();
2351
2352        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2353        assert_eq!(truncated.statement_type, "truncate");
2354        assert_eq!(truncated.affected_rows, 0);
2355
2356        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2357        assert!(empty.result.records.is_empty());
2358
2359        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2360            .unwrap();
2361        let selected = rt
2362            .execute_query("SELECT name FROM users WHERE id = 3")
2363            .unwrap();
2364        let name = selected.result.records[0].get("name").unwrap();
2365        assert_eq!(name, &Value::text("cy"));
2366        assert!(rt.db().collection_contract("users").is_some());
2367        assert!(rt
2368            .inner
2369            .index_store
2370            .list_indices("users")
2371            .iter()
2372            .any(|index| index.name == "idx_users_id"));
2373    }
2374
2375    #[test]
2376    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2377        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2378        rt.execute_query("CREATE QUEUE tasks").unwrap();
2379        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2380
2381        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2382        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2383
2384        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2385        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2386        assert_eq!(
2387            len.result.records[0].get("len"),
2388            Some(&Value::UnsignedInteger(0))
2389        );
2390    }
2391
2392    #[test]
2393    fn truncate_system_schema_is_read_only() {
2394        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2395        let err = rt
2396            .execute_query("TRUNCATE COLLECTION red.collections")
2397            .unwrap_err();
2398        assert!(format!("{err}").contains("system schema is read-only"));
2399    }
2400
2401    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2402
2403    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2404        let result = rt
2405            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2406            .expect("peek queue");
2407        result
2408            .result
2409            .records
2410            .iter()
2411            .map(
2412                |record| match record.get("payload").expect("payload column") {
2413                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2414                    other => panic!("expected JSON queue payload, got {other:?}"),
2415                },
2416            )
2417            .collect()
2418    }
2419
2420    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2421    /// `truncate` event, not one delete event per row.
2422    #[test]
2423    fn truncate_event_enabled_table_emits_single_truncate_event() {
2424        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2425        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2426            .unwrap();
2427        rt.execute_query(
2428            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2429        )
2430        .unwrap();
2431
2432        // Drain the 3 insert events so we start clean.
2433        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2434
2435        rt.execute_query("TRUNCATE TABLE users").unwrap();
2436
2437        let events = queue_payloads(&rt, "users_events");
2438        // Must be exactly 1 truncate event, not 3 delete events.
2439        assert_eq!(
2440            events.len(),
2441            1,
2442            "expected 1 truncate event, got {}",
2443            events.len()
2444        );
2445        let ev = events[0].as_object().expect("event is object");
2446        assert_eq!(
2447            ev.get("op").and_then(crate::json::Value::as_str),
2448            Some("truncate")
2449        );
2450        assert_eq!(
2451            ev.get("collection").and_then(crate::json::Value::as_str),
2452            Some("users")
2453        );
2454        assert_eq!(
2455            ev.get("entities_count")
2456                .and_then(crate::json::Value::as_u64),
2457            Some(3)
2458        );
2459        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2460        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2461        assert!(ev
2462            .get("event_id")
2463            .and_then(crate::json::Value::as_str)
2464            .is_some_and(|s| !s.is_empty()));
2465    }
2466
2467    /// `TRUNCATE users` on a collection without event subscription emits no events.
2468    #[test]
2469    fn truncate_no_events_collection_emits_nothing() {
2470        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2471        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2472            .unwrap();
2473        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2474            .unwrap();
2475        // No EVENTS subscription — truncate must work without touching any queue.
2476        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2477        // No crash, no queue to check. Just verify truncation happened.
2478        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2479        assert!(rows.result.records.is_empty());
2480    }
2481
2482    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2483    /// `collection_dropped` event. The subscription is removed from the
2484    /// source contract but the target queue is preserved for consumer drain.
2485    #[test]
2486    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2487        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2488        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2489            .unwrap();
2490        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2491            .unwrap();
2492
2493        // Drain insert events so we start clean.
2494        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2495
2496        rt.execute_query("DROP TABLE users").unwrap();
2497
2498        // Queue must still exist with 1 collection_dropped event.
2499        let events = queue_payloads(&rt, "users_events");
2500        assert_eq!(
2501            events.len(),
2502            1,
2503            "expected 1 collection_dropped event, got {}",
2504            events.len()
2505        );
2506        let ev = events[0].as_object().expect("event is object");
2507        assert_eq!(
2508            ev.get("op").and_then(crate::json::Value::as_str),
2509            Some("collection_dropped")
2510        );
2511        assert_eq!(
2512            ev.get("collection").and_then(crate::json::Value::as_str),
2513            Some("users")
2514        );
2515        assert_eq!(
2516            ev.get("final_entities_count")
2517                .and_then(crate::json::Value::as_u64),
2518            Some(2)
2519        );
2520        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2521        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2522        assert!(ev
2523            .get("event_id")
2524            .and_then(crate::json::Value::as_str)
2525            .is_some_and(|s| !s.is_empty()));
2526
2527        // Source collection is gone.
2528        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2529        assert!(
2530            format!("{err}").contains("users"),
2531            "expected not-found error"
2532        );
2533    }
2534
2535    /// `DROP TABLE users` on a collection without event subscription works
2536    /// normally with no event emitted.
2537    #[test]
2538    fn drop_no_events_collection_emits_nothing() {
2539        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2540        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2541            .unwrap();
2542        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2543            .unwrap();
2544        rt.execute_query("DROP TABLE plain").unwrap();
2545        // No crash and collection is gone.
2546        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2547        assert!(format!("{err}").contains("plain"));
2548    }
2549
2550    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2551
2552    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2553    #[test]
2554    fn ops_filter_insert_only_ignores_update_and_delete() {
2555        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2556        rt.execute_query(
2557            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2558        )
2559        .unwrap();
2560        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2561            .unwrap();
2562        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2563            .unwrap();
2564        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2565
2566        let events = queue_payloads(&rt, "items_events");
2567        // Only the INSERT should have fired.
2568        assert_eq!(
2569            events.len(),
2570            1,
2571            "expected 1 insert event, got {}",
2572            events.len()
2573        );
2574        assert_eq!(
2575            events[0]
2576                .as_object()
2577                .unwrap()
2578                .get("op")
2579                .and_then(crate::json::Value::as_str),
2580            Some("insert")
2581        );
2582    }
2583
2584    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2585    #[test]
2586    fn where_filter_skips_rows_that_do_not_match() {
2587        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2588        rt.execute_query(
2589            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2590        )
2591        .unwrap();
2592
2593        // This row should generate an event.
2594        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2595            .unwrap();
2596        // This row should NOT generate an event.
2597        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2598            .unwrap();
2599
2600        let events = queue_payloads(&rt, "users_events");
2601        assert_eq!(
2602            events.len(),
2603            1,
2604            "expected 1 event (only active), got {}",
2605            events.len()
2606        );
2607        let ev = events[0].as_object().unwrap();
2608        assert_eq!(
2609            ev.get("op").and_then(crate::json::Value::as_str),
2610            Some("insert")
2611        );
2612        let after = ev.get("after").unwrap().as_object().unwrap();
2613        assert_eq!(
2614            after.get("status").and_then(crate::json::Value::as_str),
2615            Some("active")
2616        );
2617    }
2618
2619    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2620    #[test]
2621    fn ops_filter_and_where_filter_combined() {
2622        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2623        rt.execute_query(
2624            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2625        )
2626        .unwrap();
2627
2628        // INSERT active → event
2629        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2630            .unwrap();
2631        // INSERT inactive → no event
2632        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2633            .unwrap();
2634        // UPDATE row 1 to inactive → after = inactive, no event
2635        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2636            .unwrap();
2637        // DELETE → ops_filter excludes it
2638        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2639
2640        let events = queue_payloads(&rt, "items_events");
2641        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2642        assert_eq!(
2643            events.len(),
2644            1,
2645            "expected 1 event, got {}: {events:?}",
2646            events.len()
2647        );
2648        assert_eq!(
2649            events[0]
2650                .as_object()
2651                .unwrap()
2652                .get("op")
2653                .and_then(crate::json::Value::as_str),
2654            Some("insert")
2655        );
2656    }
2657
2658    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2659    #[test]
2660    fn where_filter_on_delete_checks_before_state() {
2661        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2662        rt.execute_query(
2663            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2664        )
2665        .unwrap();
2666
2667        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2668            .unwrap();
2669
2670        // Delete active row → event (before-state was active)
2671        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2672        // Delete inactive row → no event (before-state was inactive)
2673        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2674
2675        let events = queue_payloads(&rt, "users_events");
2676        assert_eq!(
2677            events.len(),
2678            1,
2679            "expected 1 delete event, got {}",
2680            events.len()
2681        );
2682        let ev = events[0].as_object().unwrap();
2683        assert_eq!(
2684            ev.get("op").and_then(crate::json::Value::as_str),
2685            Some("delete")
2686        );
2687    }
2688
2689    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2690
2691    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2692    #[test]
2693    fn alter_add_column_on_event_enabled_table_succeeds() {
2694        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2695        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2696            .unwrap();
2697        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2698        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2699            .unwrap();
2700        // The column is now in the contract.
2701        let contract = rt.db().collection_contract("users").unwrap();
2702        assert!(
2703            contract.declared_columns.iter().any(|c| c.name == "phone"),
2704            "phone column should be in contract"
2705        );
2706        // Subscription still enabled after the alter.
2707        assert!(
2708            contract.subscriptions.iter().any(|s| s.enabled),
2709            "subscription should remain enabled"
2710        );
2711    }
2712
2713    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2714    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2715    #[test]
2716    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2717        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2718        rt.execute_query(
2719            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2720        )
2721        .unwrap();
2722        // DROP COLUMN — schema change event path exercises, must not error.
2723        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2724            .unwrap();
2725        let contract = rt.db().collection_contract("items").unwrap();
2726        assert!(
2727            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2728            "secret column should be removed"
2729        );
2730        // ENABLE RLS — non-column op, no schema-change event (coverage).
2731        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2732            .unwrap();
2733        // Collection and subscription still intact.
2734        assert!(
2735            contract.subscriptions.iter().any(|s| s.enabled),
2736            "subscription should remain enabled"
2737        );
2738    }
2739}