Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        // Check if the collection already exists.
37        let exists = store.get_collection(&query.name).is_some();
38        if exists {
39            if query.if_not_exists {
40                return Ok(RuntimeQueryResult::ok_message(
41                    raw_query.to_string(),
42                    &format!("table '{}' already exists", query.name),
43                    "create",
44                ));
45            }
46            return Err(RedDBError::Query(format!(
47                "table '{}' already exists",
48                query.name
49            )));
50        }
51
52        // Build and validate the contract before mutating storage so invalid
53        // SQL types / duplicate columns do not leave partial side effects.
54        let contract = collection_contract_from_create_table(query)?;
55        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
56        // Create the collection.
57        store
58            .create_collection(&query.name)
59            .map_err(|err| RedDBError::Internal(err.to_string()))?;
60        for subscription in &contract.subscriptions {
61            ensure_event_target_queue(self, &subscription.target_queue)?;
62        }
63        if let Some(default_ttl_ms) = query.default_ttl_ms {
64            self.inner
65                .db
66                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
67        }
68        self.inner
69            .db
70            .save_collection_contract(contract)
71            .map_err(|err| RedDBError::Internal(err.to_string()))?;
72        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
73            store.set_config_tree(
74                &format!("red.collection_tenants.{}", query.name),
75                &crate::serde_json::Value::String(tenant_id),
76            );
77        }
78        self.inner
79            .db
80            .persist_metadata()
81            .map_err(|err| RedDBError::Internal(err.to_string()))?;
82        self.refresh_table_planner_stats(&query.name);
83        self.invalidate_result_cache();
84        // Issue #120 — feed the create into the schema-vocabulary
85        // reverse index so AskPipeline (#121) sees this collection.
86        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
87        self.schema_vocabulary_apply(
88            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
89                collection: query.name.clone(),
90                columns,
91                type_tags: Vec::new(),
92                description: None,
93            },
94        );
95        // Partition metadata (Phase 2.2 PG parity).
96        //
97        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
98        // clause, stamp the partition config into `red_config` under
99        // `partition.{table}.{by,column}`. Children are registered separately
100        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
101        if let Some(spec) = &query.partition_by {
102            let kind_str = match spec.kind {
103                crate::storage::query::ast::PartitionKind::Range => "range",
104                crate::storage::query::ast::PartitionKind::List => "list",
105                crate::storage::query::ast::PartitionKind::Hash => "hash",
106            };
107            store.set_config_tree(
108                &format!("partition.{}.by", query.name),
109                &crate::serde_json::Value::String(kind_str.to_string()),
110            );
111            store.set_config_tree(
112                &format!("partition.{}.column", query.name),
113                &crate::serde_json::Value::String(spec.column.clone()),
114            );
115        }
116
117        // Table-scoped multi-tenancy (Phase 2.5.4).
118        //
119        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
120        //   1. Persists the `tenant_tables.{table}.column` marker so
121        //      INSERTs can auto-fill and future opens re-hydrate.
122        //   2. Registers the table in the in-memory `tenant_tables`
123        //      HashMap used by the DML auto-fill path.
124        //   3. Installs an implicit RLS policy equivalent to
125        //      `USING (col = CURRENT_TENANT())` across all actions.
126        //   4. Flips `rls_enabled_tables` on so the policy applies.
127        if let Some(col) = &query.tenant_by {
128            store.set_config_tree(
129                &format!("tenant_tables.{}.column", query.name),
130                &crate::serde_json::Value::String(col.clone()),
131            );
132            self.register_tenant_table(&query.name, col);
133        }
134
135        let ttl_suffix = query
136            .default_ttl_ms
137            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
138            .unwrap_or_default();
139
140        let tenant_suffix = query
141            .tenant_by
142            .as_ref()
143            .map(|col| format!(" (tenant-scoped by {col})"))
144            .unwrap_or_default();
145
146        Ok(RuntimeQueryResult::ok_message(
147            raw_query.to_string(),
148            &format!(
149                "table '{}' created{}{}",
150                query.name, ttl_suffix, tenant_suffix
151            ),
152            "create",
153        ))
154    }
155
156    fn execute_create_keyed_collection(
157        &self,
158        raw_query: &str,
159        query: &CreateTableQuery,
160    ) -> RedDBResult<RuntimeQueryResult> {
161        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
162        if is_system_schema_name(&query.name) {
163            return Err(RedDBError::Query("system schema is read-only".to_string()));
164        }
165        let store = self.inner.db.store();
166        let label = polymorphic_resolver::model_name(query.collection_model);
167        if store.get_collection(&query.name).is_some() {
168            if query.if_not_exists {
169                return Ok(RuntimeQueryResult::ok_message(
170                    raw_query.to_string(),
171                    &format!("{label} '{}' already exists", query.name),
172                    "create",
173                ));
174            }
175            return Err(RedDBError::Query(format!(
176                "{label} '{}' already exists",
177                query.name
178            )));
179        }
180
181        store
182            .create_collection(&query.name)
183            .map_err(|err| RedDBError::Internal(err.to_string()))?;
184        if query.collection_model == CollectionModel::Vault {
185            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
186            let key_scope = if query.vault_own_master_key {
187                "own"
188            } else {
189                "cluster"
190            };
191            store.set_config_tree(
192                &format!("red.vault.{}.key_scope", query.name),
193                &crate::serde_json::Value::String(key_scope.to_string()),
194            );
195            store.set_config_tree(
196                &format!("red.vault.{}.status", query.name),
197                &crate::serde_json::Value::String("sealed".to_string()),
198            );
199        }
200        self.inner
201            .db
202            .save_collection_contract(keyed_collection_contract(
203                &query.name,
204                query.collection_model,
205            ))
206            .map_err(|err| RedDBError::Internal(err.to_string()))?;
207        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
208            store.set_config_tree(
209                &format!("red.collection_tenants.{}", query.name),
210                &crate::serde_json::Value::String(tenant_id),
211            );
212        }
213        self.inner
214            .db
215            .persist_metadata()
216            .map_err(|err| RedDBError::Internal(err.to_string()))?;
217        self.invalidate_result_cache();
218
219        Ok(RuntimeQueryResult::ok_message(
220            raw_query.to_string(),
221            &format!("{label} '{}' created", query.name),
222            "create",
223        ))
224    }
225
226    fn provision_vault_key_material(
227        &self,
228        collection: &str,
229        own_master_key: bool,
230    ) -> RedDBResult<()> {
231        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
232            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
233        })?;
234        if !auth_store.is_vault_backed() {
235            return Err(RedDBError::Query(
236                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
237            ));
238        }
239
240        if auth_store.vault_secret_key().is_none() {
241            let key = crate::auth::store::random_bytes(32);
242            auth_store
243                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
244                .map_err(|err| RedDBError::Query(err.to_string()))?;
245        }
246
247        if own_master_key {
248            let key = crate::auth::store::random_bytes(32);
249            auth_store
250                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
251                .map_err(|err| RedDBError::Query(err.to_string()))?;
252        }
253
254        Ok(())
255    }
256
257    /// Execute DROP TABLE
258    ///
259    /// Drops the collection and all its data from the store.
260    pub fn execute_drop_table(
261        &self,
262        raw_query: &str,
263        query: &DropTableQuery,
264    ) -> RedDBResult<RuntimeQueryResult> {
265        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
266        let store = self.inner.db.store();
267
268        if is_system_schema_name(&query.name) {
269            return Err(RedDBError::Query("system schema is read-only".to_string()));
270        }
271
272        let exists = store.get_collection(&query.name).is_some();
273        if !exists {
274            if query.if_exists {
275                return Ok(RuntimeQueryResult::ok_message(
276                    raw_query.to_string(),
277                    &format!("table '{}' does not exist", query.name),
278                    "drop",
279                ));
280            }
281            return Err(RedDBError::NotFound(format!(
282                "table '{}' not found",
283                query.name
284            )));
285        }
286        let actual =
287            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
288        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
289
290        // Emit 1 collection_dropped event before storage is wiped.
291        // Queue is preserved; subscription is removed with the contract below.
292        let final_count = store
293            .get_collection(&query.name)
294            .map(|manager| manager.query_all(|_| true).len() as u64)
295            .unwrap_or(0);
296        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
297            self,
298            &query.name,
299            final_count,
300        )?;
301
302        let orphaned_indices: Vec<String> = self
303            .inner
304            .index_store
305            .list_indices(&query.name)
306            .into_iter()
307            .map(|index| index.name)
308            .collect();
309        for name in &orphaned_indices {
310            self.inner.index_store.drop_index(name, &query.name);
311        }
312
313        store
314            .drop_collection(&query.name)
315            .map_err(|err| RedDBError::Internal(err.to_string()))?;
316        self.inner.db.invalidate_vector_index(&query.name);
317        self.inner.db.clear_collection_default_ttl_ms(&query.name);
318        self.inner
319            .db
320            .remove_collection_contract(&query.name)
321            .map_err(|err| RedDBError::Internal(err.to_string()))?;
322        self.clear_table_planner_stats(&query.name);
323        self.invalidate_result_cache();
324        // Issue #119: a dropped collection vanishes from every
325        // (tenant, role)'s visible-collections set. Auth is optional in
326        // embedded mode so guard the call.
327        if let Some(store) = self.inner.auth_store.read().clone() {
328            store.invalidate_visible_collections_cache();
329        }
330        self.inner
331            .db
332            .persist_metadata()
333            .map_err(|err| RedDBError::Internal(err.to_string()))?;
334        // Issue #120 — drop both the collection entries *and* every
335        // index entry that was scoped to this collection.  Dropping the
336        // collection wipes columns + collection-name + type-tags +
337        // index hits in one pass via `purge_collection_entries`, so
338        // the explicit `DropIndex` calls would be redundant.
339        self.schema_vocabulary_apply(
340            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
341                collection: query.name.clone(),
342            },
343        );
344
345        Ok(RuntimeQueryResult::ok_message(
346            raw_query.to_string(),
347            &format!("table '{}' dropped", query.name),
348            "drop",
349        ))
350    }
351
352    pub fn execute_drop_graph(
353        &self,
354        raw_query: &str,
355        query: &DropGraphQuery,
356    ) -> RedDBResult<RuntimeQueryResult> {
357        self.execute_drop_typed_collection(
358            raw_query,
359            &query.name,
360            query.if_exists,
361            CollectionModel::Graph,
362            "graph",
363        )
364    }
365
366    pub fn execute_drop_vector(
367        &self,
368        raw_query: &str,
369        query: &DropVectorQuery,
370    ) -> RedDBResult<RuntimeQueryResult> {
371        self.execute_drop_typed_collection(
372            raw_query,
373            &query.name,
374            query.if_exists,
375            CollectionModel::Vector,
376            "vector",
377        )
378    }
379
380    pub fn execute_drop_document(
381        &self,
382        raw_query: &str,
383        query: &DropDocumentQuery,
384    ) -> RedDBResult<RuntimeQueryResult> {
385        self.execute_drop_typed_collection(
386            raw_query,
387            &query.name,
388            query.if_exists,
389            CollectionModel::Document,
390            "document",
391        )
392    }
393
394    pub fn execute_drop_kv(
395        &self,
396        raw_query: &str,
397        query: &DropKvQuery,
398    ) -> RedDBResult<RuntimeQueryResult> {
399        let label = polymorphic_resolver::model_name(query.model);
400        self.execute_drop_typed_collection(
401            raw_query,
402            &query.name,
403            query.if_exists,
404            query.model,
405            label,
406        )
407    }
408
409    pub fn execute_drop_collection(
410        &self,
411        raw_query: &str,
412        query: &DropCollectionQuery,
413    ) -> RedDBResult<RuntimeQueryResult> {
414        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
415        if is_system_schema_name(&query.name) {
416            return Err(RedDBError::Query("system schema is read-only".to_string()));
417        }
418        let store = self.inner.db.store();
419        if store.get_collection(&query.name).is_none() {
420            if query.if_exists {
421                return Ok(RuntimeQueryResult::ok_message(
422                    raw_query.to_string(),
423                    &format!("collection '{}' does not exist", query.name),
424                    "drop",
425                ));
426            }
427            return Err(RedDBError::NotFound(format!(
428                "collection '{}' not found",
429                query.name
430            )));
431        }
432
433        match polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())? {
434            CollectionModel::Table => self.execute_drop_table(
435                raw_query,
436                &DropTableQuery {
437                    name: query.name.clone(),
438                    if_exists: query.if_exists,
439                },
440            ),
441            CollectionModel::TimeSeries => self.execute_drop_timeseries(
442                raw_query,
443                &DropTimeSeriesQuery {
444                    name: query.name.clone(),
445                    if_exists: query.if_exists,
446                },
447            ),
448            CollectionModel::Queue => self.execute_drop_queue(
449                raw_query,
450                &DropQueueQuery {
451                    name: query.name.clone(),
452                    if_exists: query.if_exists,
453                },
454            ),
455            CollectionModel::Graph => self.execute_drop_graph(
456                raw_query,
457                &DropGraphQuery {
458                    name: query.name.clone(),
459                    if_exists: query.if_exists,
460                },
461            ),
462            CollectionModel::Vector => self.execute_drop_vector(
463                raw_query,
464                &DropVectorQuery {
465                    name: query.name.clone(),
466                    if_exists: query.if_exists,
467                },
468            ),
469            CollectionModel::Document => self.execute_drop_document(
470                raw_query,
471                &DropDocumentQuery {
472                    name: query.name.clone(),
473                    if_exists: query.if_exists,
474                },
475            ),
476            CollectionModel::Kv => self.execute_drop_kv(
477                raw_query,
478                &DropKvQuery {
479                    name: query.name.clone(),
480                    if_exists: query.if_exists,
481                    model: CollectionModel::Kv,
482                },
483            ),
484            CollectionModel::Config => self.execute_drop_kv(
485                raw_query,
486                &DropKvQuery {
487                    name: query.name.clone(),
488                    if_exists: query.if_exists,
489                    model: CollectionModel::Config,
490                },
491            ),
492            CollectionModel::Vault => self.execute_drop_kv(
493                raw_query,
494                &DropKvQuery {
495                    name: query.name.clone(),
496                    if_exists: query.if_exists,
497                    model: CollectionModel::Vault,
498                },
499            ),
500            CollectionModel::Mixed => self.execute_drop_typed_collection(
501                raw_query,
502                &query.name,
503                query.if_exists,
504                CollectionModel::Mixed,
505                "collection",
506            ),
507        }
508    }
509
510    /// Execute ALTER TABLE
511    ///
512    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
513    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
514    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
515    pub fn execute_alter_table(
516        &self,
517        raw_query: &str,
518        query: &AlterTableQuery,
519    ) -> RedDBResult<RuntimeQueryResult> {
520        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
521        let store = self.inner.db.store();
522
523        // Verify the table exists.
524        if store.get_collection(&query.name).is_none() {
525            return Err(RedDBError::NotFound(format!(
526                "table '{}' not found",
527                query.name
528            )));
529        }
530
531        let mut messages = Vec::new();
532
533        // Collect column-level changes upfront for schema-change event emission below.
534        let fields_added: Vec<String> = query
535            .operations
536            .iter()
537            .filter_map(|op| {
538                if let AlterOperation::AddColumn(col) = op {
539                    Some(col.name.clone())
540                } else {
541                    None
542                }
543            })
544            .collect();
545        let fields_removed: Vec<String> = query
546            .operations
547            .iter()
548            .filter_map(|op| {
549                if let AlterOperation::DropColumn(name) = op {
550                    Some(name.clone())
551                } else {
552                    None
553                }
554            })
555            .collect();
556
557        for op in &query.operations {
558            match op {
559                AlterOperation::AddColumn(col) => {
560                    // Schema-on-read: column will be available on next insert.
561                    messages.push(format!("column '{}' added", col.name));
562                }
563                AlterOperation::DropColumn(name) => {
564                    messages.push(format!("column '{}' dropped", name));
565                }
566                AlterOperation::RenameColumn { from, to } => {
567                    messages.push(format!("column '{}' renamed to '{}'", from, to));
568                }
569                AlterOperation::AttachPartition { child, bound } => {
570                    // Persist child → parent binding in red_config so the
571                    // future planner-side pruner can enumerate children and
572                    // evaluate their bounds.
573                    store.set_config_tree(
574                        &format!("partition.{}.children.{}", query.name, child),
575                        &crate::serde_json::Value::String(bound.clone()),
576                    );
577                    messages.push(format!(
578                        "partition '{child}' attached to '{}' ({bound})",
579                        query.name
580                    ));
581                }
582                AlterOperation::DetachPartition { child } => {
583                    store.set_config_tree(
584                        &format!("partition.{}.children.{}", query.name, child),
585                        &crate::serde_json::Value::Null,
586                    );
587                    messages.push(format!(
588                        "partition '{child}' detached from '{}'",
589                        query.name
590                    ));
591                }
592                AlterOperation::EnableRowLevelSecurity => {
593                    self.inner
594                        .rls_enabled_tables
595                        .write()
596                        .insert(query.name.clone());
597                    // Persist flag so RLS survives restart via red_config.
598                    store.set_config_tree(
599                        &format!("rls.enabled.{}", query.name),
600                        &crate::serde_json::Value::Bool(true),
601                    );
602                    self.invalidate_plan_cache();
603                    messages.push(format!("row level security enabled on '{}'", query.name));
604                }
605                AlterOperation::DisableRowLevelSecurity => {
606                    self.inner.rls_enabled_tables.write().remove(&query.name);
607                    store.set_config_tree(
608                        &format!("rls.enabled.{}", query.name),
609                        &crate::serde_json::Value::Null,
610                    );
611                    self.invalidate_plan_cache();
612                    messages.push(format!("row level security disabled on '{}'", query.name));
613                }
614                // Phase 2.5.4: retrofit tenancy onto an existing table.
615                AlterOperation::EnableTenancy { column } => {
616                    store.set_config_tree(
617                        &format!("tenant_tables.{}.column", query.name),
618                        &crate::serde_json::Value::String(column.clone()),
619                    );
620                    self.register_tenant_table(&query.name, column);
621                    self.invalidate_plan_cache();
622                    messages.push(format!(
623                        "tenancy enabled on '{}' by column '{column}'",
624                        query.name
625                    ));
626                }
627                AlterOperation::DisableTenancy => {
628                    store.set_config_tree(
629                        &format!("tenant_tables.{}.column", query.name),
630                        &crate::serde_json::Value::Null,
631                    );
632                    self.unregister_tenant_table(&query.name);
633                    self.invalidate_plan_cache();
634                    messages.push(format!("tenancy disabled on '{}'", query.name));
635                }
636                AlterOperation::SetAppendOnly(on) => {
637                    // Contract is the single source of truth for the
638                    // UPDATE/DELETE parse-time guard. The flag lands
639                    // below via `apply_alter_operations_to_contract`;
640                    // here we only publish the human-readable message.
641                    messages.push(format!(
642                        "append_only {} on '{}'",
643                        if *on { "enabled" } else { "disabled" },
644                        query.name
645                    ));
646                }
647                AlterOperation::SetVersioned(on) => {
648                    // Opt the collection into (or out of) Git-for-Data.
649                    // Persists a row in red_vcs_settings; next AS OF /
650                    // merge / diff against this table honours the
651                    // flag. Retroactive: existing row versions whose
652                    // xmins are still pinned by commits become
653                    // reachable via AS OF COMMIT immediately.
654                    self.vcs_set_versioned(&query.name, *on)?;
655                    messages.push(format!(
656                        "versioned {} on '{}'",
657                        if *on { "enabled" } else { "disabled" },
658                        query.name
659                    ));
660                }
661                AlterOperation::EnableEvents(subscription) => {
662                    let mut subscription = subscription.clone();
663                    subscription.source = query.name.clone();
664                    validate_event_subscriptions(
665                        self,
666                        &query.name,
667                        std::slice::from_ref(&subscription),
668                    )?;
669                    ensure_event_target_queue(self, &subscription.target_queue)?;
670                    messages.push(format!(
671                        "events enabled on '{}' to '{}'",
672                        query.name, subscription.target_queue
673                    ));
674                }
675                AlterOperation::DisableEvents => {
676                    messages.push(format!("events disabled on '{}'", query.name));
677                }
678                AlterOperation::AddSubscription { name, descriptor } => {
679                    let mut sub = descriptor.clone();
680                    sub.name = name.clone();
681                    sub.source = query.name.clone();
682                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
683                    ensure_event_target_queue(self, &sub.target_queue)?;
684                    messages.push(format!(
685                        "subscription '{}' added on '{}' to '{}'",
686                        name, query.name, sub.target_queue
687                    ));
688                }
689                AlterOperation::DropSubscription { name } => {
690                    messages.push(format!(
691                        "subscription '{}' dropped on '{}'",
692                        name, query.name
693                    ));
694                }
695            }
696        }
697
698        let mut contract = self
699            .inner
700            .db
701            .collection_contract(&query.name)
702            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
703        apply_alter_operations_to_contract(&mut contract, &query.operations);
704        contract.version = contract.version.saturating_add(1);
705        contract.updated_at_unix_ms = current_unix_ms();
706        self.inner
707            .db
708            .save_collection_contract(contract)
709            .map_err(|err| RedDBError::Internal(err.to_string()))?;
710        // Issue #301 — emit OperatorEvent when column schema changes on a
711        // collection that has active event subscriptions, so operators know
712        // downstream consumers may see a different payload shape.
713        if !fields_added.is_empty() || !fields_removed.is_empty() {
714            let sub_names: Vec<String> = self
715                .inner
716                .db
717                .collection_contract(&query.name)
718                .map(|c| {
719                    c.subscriptions
720                        .iter()
721                        .filter(|s| s.enabled)
722                        .map(|s| s.name.clone())
723                        .collect()
724                })
725                .unwrap_or_default();
726            if !sub_names.is_empty() {
727                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
728                    collection: query.name.clone(),
729                    subscription_names: sub_names.join(", "),
730                    fields_added: fields_added.join(", "),
731                    fields_removed: fields_removed.join(", "),
732                    lsn: self.cdc_current_lsn(),
733                }
734                .emit_global();
735            }
736        }
737
738        self.clear_table_planner_stats(&query.name);
739        self.invalidate_result_cache();
740        // Issue #120 — refresh the schema-vocabulary entries from the
741        // post-ALTER contract. Drop+recreate inside the index keeps
742        // the invalidation guarantee complete (no stale columns from
743        // before an ALTER ... DROP COLUMN).
744        let post_alter_columns: Vec<String> = self
745            .inner
746            .db
747            .collection_contract(&query.name)
748            .map(|contract| {
749                contract
750                    .declared_columns
751                    .iter()
752                    .map(|col| col.name.clone())
753                    .collect()
754            })
755            .unwrap_or_default();
756        self.schema_vocabulary_apply(
757            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
758                collection: query.name.clone(),
759                columns: post_alter_columns,
760                type_tags: Vec::new(),
761                description: None,
762            },
763        );
764
765        let message = if messages.is_empty() {
766            format!("table '{}' altered (no operations)", query.name)
767        } else {
768            format!("table '{}' altered: {}", query.name, messages.join(", "))
769        };
770
771        Ok(RuntimeQueryResult::ok_message(
772            raw_query.to_string(),
773            &message,
774            "alter",
775        ))
776    }
777
778    /// Execute EXPLAIN ALTER FOR CREATE TABLE
779    ///
780    /// Pure read: computes the schema diff between the target table's
781    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
782    /// and returns it as SQL `ALTER TABLE` text (default) or structured
783    /// JSON. Never mutates storage.
784    pub fn execute_explain_alter(
785        &self,
786        raw_query: &str,
787        query: &ExplainAlterQuery,
788    ) -> RedDBResult<RuntimeQueryResult> {
789        // Validate the target CREATE TABLE body so syntactically valid
790        // but semantically broken targets (bad SQL types, duplicate
791        // columns) are caught here rather than inside the diff engine.
792        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
793
794        let current_contract = self.inner.db.collection_contract(&query.target.name);
795
796        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
797            .as_ref()
798            .map(|c| c.declared_columns.clone())
799            .unwrap_or_default();
800
801        let diff = super::schema_diff::compute_column_diff(
802            &query.target.name,
803            &current_columns,
804            &query.target.columns,
805        );
806
807        let rendered = match query.format {
808            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
809            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
810        };
811
812        let format_label = match query.format {
813            ExplainFormat::Sql => "sql",
814            ExplainFormat::Json => "json",
815        };
816
817        let columns = vec![
818            "table".to_string(),
819            "format".to_string(),
820            "diff".to_string(),
821        ];
822        let row = vec![
823            ("table".to_string(), Value::text(query.target.name.clone())),
824            ("format".to_string(), Value::text(format_label.to_string())),
825            ("diff".to_string(), Value::text(rendered)),
826        ];
827
828        Ok(RuntimeQueryResult::ok_records(
829            raw_query.to_string(),
830            columns,
831            vec![row],
832            "explain",
833        ))
834    }
835
836    /// Execute CREATE INDEX
837    ///
838    /// Registers a new index on a collection, builds it from existing data,
839    /// and makes it available to the query executor for O(1) lookups.
840    pub fn execute_create_index(
841        &self,
842        raw_query: &str,
843        query: &CreateIndexQuery,
844    ) -> RedDBResult<RuntimeQueryResult> {
845        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
846        let store = self.inner.db.store();
847
848        // Verify the table exists
849        let manager = store
850            .get_collection(&query.table)
851            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
852
853        let method_kind = match query.method {
854            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
855            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
856            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
857            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
858        };
859
860        // Extract fields from existing entities for indexing. Row
861        // entities may arrive in either the "named" HashMap layout
862        // (gRPC `BulkInsertBinary` path) OR the columnar layout
863        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
864        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
865        // leaves `named == None`). Prior to this commit the columnar
866        // branch returned an empty field list, so `CREATE INDEX` built
867        // a zero-entity index over HTTP-inserted data even though the
868        // data was queryable via `SELECT`.
869        let entities = manager.query_all(|_| true);
870        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
871            entities
872                .iter()
873                .map(|e| {
874                    let fields = match &e.data {
875                        crate::storage::EntityData::Row(row) => {
876                            if let Some(ref named) = row.named {
877                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
878                            } else if let Some(ref schema) = row.schema {
879                                // Columnar layout — pair each column
880                                // with its positional name from the
881                                // shared schema Arc.
882                                schema
883                                    .iter()
884                                    .zip(row.columns.iter())
885                                    .map(|(k, v)| (k.clone(), v.clone()))
886                                    .collect()
887                            } else {
888                                Vec::new()
889                            }
890                        }
891                        crate::storage::EntityData::Node(node) => node
892                            .properties
893                            .iter()
894                            .map(|(k, v)| (k.clone(), v.clone()))
895                            .collect(),
896                        _ => Vec::new(),
897                    };
898                    (e.id, fields)
899                })
900                .collect();
901
902        // Build the index
903        let indexed_count = self
904            .inner
905            .index_store
906            .create_index(
907                &query.name,
908                &query.table,
909                &query.columns,
910                method_kind,
911                query.unique,
912                &entity_fields,
913            )
914            .map_err(RedDBError::Internal)?;
915
916        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
917            &query.table,
918            &entity_fields,
919        );
920        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
921        self.invalidate_plan_cache();
922
923        // Register metadata
924        self.inner
925            .index_store
926            .register(super::index_store::RegisteredIndex {
927                name: query.name.clone(),
928                collection: query.table.clone(),
929                columns: query.columns.clone(),
930                method: method_kind,
931                unique: query.unique,
932            });
933        // Issue #120 — surface the index name + indexed columns in
934        // the schema-vocabulary so AskPipeline (#121) can resolve
935        // "the email index" back to its collection.
936        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
937            collection: query.table.clone(),
938            index: query.name.clone(),
939            columns: query.columns.clone(),
940        });
941
942        let method_str = format!("{}", query.method);
943        let unique_str = if query.unique { "unique " } else { "" };
944        let cols = query.columns.join(", ");
945
946        Ok(RuntimeQueryResult::ok_message(
947            raw_query.to_string(),
948            &format!(
949                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
950                unique_str, query.name, query.table, cols, method_str, indexed_count
951            ),
952            "create",
953        ))
954    }
955
956    /// Execute DROP INDEX
957    ///
958    /// Removes an index from a collection.
959    pub fn execute_drop_index(
960        &self,
961        raw_query: &str,
962        query: &DropIndexQuery,
963    ) -> RedDBResult<RuntimeQueryResult> {
964        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
965        let store = self.inner.db.store();
966
967        // Verify the table exists
968        if store.get_collection(&query.table).is_none() {
969            if query.if_exists {
970                return Ok(RuntimeQueryResult::ok_message(
971                    raw_query.to_string(),
972                    &format!("table '{}' does not exist", query.table),
973                    "drop",
974                ));
975            }
976            return Err(RedDBError::NotFound(format!(
977                "table '{}' not found",
978                query.table
979            )));
980        }
981
982        // Remove from IndexStore
983        self.inner.index_store.drop_index(&query.name, &query.table);
984        self.invalidate_plan_cache();
985        // Issue #120 — keep the schema-vocabulary index entry in sync.
986        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
987            collection: query.table.clone(),
988            index: query.name.clone(),
989        });
990
991        Ok(RuntimeQueryResult::ok_message(
992            raw_query.to_string(),
993            &format!("index '{}' dropped from '{}'", query.name, query.table),
994            "drop",
995        ))
996    }
997
998    fn execute_drop_typed_collection(
999        &self,
1000        raw_query: &str,
1001        name: &str,
1002        if_exists: bool,
1003        expected_model: CollectionModel,
1004        label: &str,
1005    ) -> RedDBResult<RuntimeQueryResult> {
1006        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1007        if is_system_schema_name(name) {
1008            return Err(RedDBError::Query("system schema is read-only".to_string()));
1009        }
1010        let store = self.inner.db.store();
1011        if store.get_collection(name).is_none() {
1012            if if_exists {
1013                return Ok(RuntimeQueryResult::ok_message(
1014                    raw_query.to_string(),
1015                    &format!("{label} '{name}' does not exist"),
1016                    "drop",
1017                ));
1018            }
1019            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1020        }
1021
1022        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1023        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1024        self.drop_collection_storage(raw_query, name, label)
1025    }
1026
1027    pub fn execute_truncate(
1028        &self,
1029        raw_query: &str,
1030        query: &TruncateQuery,
1031    ) -> RedDBResult<RuntimeQueryResult> {
1032        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1033        if is_system_schema_name(&query.name) {
1034            return Err(RedDBError::Query("system schema is read-only".to_string()));
1035        }
1036
1037        let label = query
1038            .model
1039            .map(polymorphic_resolver::model_name)
1040            .unwrap_or("collection");
1041        let store = self.inner.db.store();
1042        if store.get_collection(&query.name).is_none() {
1043            if query.if_exists {
1044                return Ok(RuntimeQueryResult::ok_message(
1045                    raw_query.to_string(),
1046                    &format!("{label} '{}' does not exist", query.name),
1047                    "truncate",
1048                ));
1049            }
1050            return Err(RedDBError::NotFound(format!(
1051                "{label} '{}' not found",
1052                query.name
1053            )));
1054        }
1055
1056        let actual =
1057            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1058        if let Some(expected) = query.model {
1059            polymorphic_resolver::ensure_model_match(expected, actual)?;
1060        }
1061
1062        if actual == CollectionModel::Queue {
1063            return self.execute_queue_command(
1064                raw_query,
1065                &QueueCommand::Purge {
1066                    queue: query.name.clone(),
1067                },
1068            );
1069        }
1070
1071        // Count before wiping so we can emit the aggregated truncate event.
1072        let affected = self.truncate_collection_entities(&query.name)?;
1073        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1074        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1075        self.inner.db.invalidate_vector_index(&query.name);
1076        self.clear_table_planner_stats(&query.name);
1077        self.invalidate_result_cache();
1078
1079        Ok(RuntimeQueryResult::ok_message(
1080            raw_query.to_string(),
1081            &format!(
1082                "{affected} entities truncated from {label} '{}'",
1083                query.name
1084            ),
1085            "truncate",
1086        ))
1087    }
1088
1089    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1090        let store = self.inner.db.store();
1091        let Some(manager) = store.get_collection(name) else {
1092            return Ok(0);
1093        };
1094        let entities = manager.query_all(|_| true);
1095        if entities.is_empty() {
1096            return Ok(0);
1097        }
1098
1099        for entity in &entities {
1100            let fields = entity_index_fields(&entity.data);
1101            self.inner
1102                .index_store
1103                .index_entity_delete(name, entity.id, &fields)
1104                .map_err(RedDBError::Internal)?;
1105        }
1106
1107        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1108        let deleted_ids = store
1109            .delete_batch(name, &ids)
1110            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1111        for id in &deleted_ids {
1112            store.context_index().remove_entity(*id);
1113        }
1114        Ok(deleted_ids.len() as u64)
1115    }
1116
1117    fn drop_collection_storage(
1118        &self,
1119        raw_query: &str,
1120        name: &str,
1121        label: &str,
1122    ) -> RedDBResult<RuntimeQueryResult> {
1123        let store = self.inner.db.store();
1124
1125        // Emit 1 collection_dropped event before storage is wiped.
1126        // Queue is preserved; subscription is removed with the contract below.
1127        let final_count = store
1128            .get_collection(name)
1129            .map(|manager| manager.query_all(|_| true).len() as u64)
1130            .unwrap_or(0);
1131        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1132            self,
1133            name,
1134            final_count,
1135        )?;
1136
1137        let orphaned_indices: Vec<String> = self
1138            .inner
1139            .index_store
1140            .list_indices(name)
1141            .into_iter()
1142            .map(|index| index.name)
1143            .collect();
1144        for index_name in &orphaned_indices {
1145            self.inner.index_store.drop_index(index_name, name);
1146        }
1147
1148        store
1149            .drop_collection(name)
1150            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1151        self.inner.db.invalidate_vector_index(name);
1152        self.inner.db.clear_collection_default_ttl_ms(name);
1153        self.inner
1154            .db
1155            .remove_collection_contract(name)
1156            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1157        self.clear_table_planner_stats(name);
1158        self.invalidate_result_cache();
1159        if let Some(store) = self.inner.auth_store.read().clone() {
1160            store.invalidate_visible_collections_cache();
1161        }
1162        self.inner
1163            .db
1164            .persist_metadata()
1165            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1166        self.schema_vocabulary_apply(
1167            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1168                collection: name.to_string(),
1169            },
1170        );
1171
1172        Ok(RuntimeQueryResult::ok_message(
1173            raw_query.to_string(),
1174            &format!("{label} '{name}' dropped"),
1175            "drop",
1176        ))
1177    }
1178}
1179
1180pub(crate) fn is_system_schema_name(name: &str) -> bool {
1181    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1182}
1183
1184fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1185    match data {
1186        EntityData::Row(row) => {
1187            if let Some(ref named) = row.named {
1188                named
1189                    .iter()
1190                    .map(|(key, value)| (key.clone(), value.clone()))
1191                    .collect()
1192            } else if let Some(ref schema) = row.schema {
1193                schema
1194                    .iter()
1195                    .zip(row.columns.iter())
1196                    .map(|(key, value)| (key.clone(), value.clone()))
1197                    .collect()
1198            } else {
1199                Vec::new()
1200            }
1201        }
1202        EntityData::Node(node) => node
1203            .properties
1204            .iter()
1205            .map(|(key, value)| (key.clone(), value.clone()))
1206            .collect(),
1207        _ => Vec::new(),
1208    }
1209}
1210
1211fn collection_contract_from_create_table(
1212    query: &CreateTableQuery,
1213) -> RedDBResult<crate::physical::CollectionContract> {
1214    let now = current_unix_ms();
1215    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1216        .columns
1217        .iter()
1218        .map(declared_column_contract_from_ddl)
1219        .collect();
1220    if query.timestamps {
1221        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1222        // columns that the write path populates from
1223        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1224        declared_columns.push(crate::physical::DeclaredColumnContract {
1225            name: "created_at".to_string(),
1226            data_type: "BIGINT".to_string(),
1227            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1228            not_null: true,
1229            default: None,
1230            compress: None,
1231            unique: false,
1232            primary_key: false,
1233            enum_variants: Vec::new(),
1234            array_element: None,
1235            decimal_precision: None,
1236        });
1237        declared_columns.push(crate::physical::DeclaredColumnContract {
1238            name: "updated_at".to_string(),
1239            data_type: "BIGINT".to_string(),
1240            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1241            not_null: true,
1242            default: None,
1243            compress: None,
1244            unique: false,
1245            primary_key: false,
1246            enum_variants: Vec::new(),
1247            array_element: None,
1248            decimal_precision: None,
1249        });
1250    }
1251    Ok(crate::physical::CollectionContract {
1252        name: query.name.clone(),
1253        declared_model: crate::catalog::CollectionModel::Table,
1254        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1255        origin: crate::physical::ContractOrigin::Explicit,
1256        version: 1,
1257        created_at_unix_ms: now,
1258        updated_at_unix_ms: now,
1259        default_ttl_ms: query.default_ttl_ms,
1260        context_index_fields: query.context_index_fields.clone(),
1261        declared_columns,
1262        table_def: Some(build_table_def_from_create_table(query)?),
1263        timestamps_enabled: query.timestamps,
1264        context_index_enabled: query.context_index_enabled
1265            || !query.context_index_fields.is_empty(),
1266        append_only: query.append_only,
1267        subscriptions: query.subscriptions.clone(),
1268    })
1269}
1270
1271fn default_collection_contract_for_existing_table(
1272    name: &str,
1273) -> crate::physical::CollectionContract {
1274    let now = current_unix_ms();
1275    crate::physical::CollectionContract {
1276        name: name.to_string(),
1277        declared_model: crate::catalog::CollectionModel::Table,
1278        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1279        origin: crate::physical::ContractOrigin::Explicit,
1280        version: 0,
1281        created_at_unix_ms: now,
1282        updated_at_unix_ms: now,
1283        default_ttl_ms: None,
1284        context_index_fields: Vec::new(),
1285        declared_columns: Vec::new(),
1286        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1287        timestamps_enabled: false,
1288        context_index_enabled: false,
1289        append_only: false,
1290        subscriptions: Vec::new(),
1291    }
1292}
1293
1294fn keyed_collection_contract(
1295    name: &str,
1296    model: crate::catalog::CollectionModel,
1297) -> crate::physical::CollectionContract {
1298    let now = current_unix_ms();
1299    crate::physical::CollectionContract {
1300        name: name.to_string(),
1301        declared_model: model,
1302        schema_mode: crate::catalog::SchemaMode::Dynamic,
1303        origin: crate::physical::ContractOrigin::Explicit,
1304        version: 1,
1305        created_at_unix_ms: now,
1306        updated_at_unix_ms: now,
1307        default_ttl_ms: None,
1308        context_index_fields: Vec::new(),
1309        declared_columns: Vec::new(),
1310        table_def: None,
1311        timestamps_enabled: false,
1312        context_index_enabled: false,
1313        append_only: false,
1314        subscriptions: Vec::new(),
1315    }
1316}
1317
1318fn declared_column_contract_from_ddl(
1319    column: &CreateColumnDef,
1320) -> crate::physical::DeclaredColumnContract {
1321    crate::physical::DeclaredColumnContract {
1322        name: column.name.clone(),
1323        data_type: column.data_type.clone(),
1324        sql_type: Some(column.sql_type.clone()),
1325        not_null: column.not_null,
1326        default: column.default.clone(),
1327        compress: column.compress,
1328        unique: column.unique,
1329        primary_key: column.primary_key,
1330        enum_variants: column.enum_variants.clone(),
1331        array_element: column.array_element.clone(),
1332        decimal_precision: column.decimal_precision,
1333    }
1334}
1335
1336fn apply_alter_operations_to_contract(
1337    contract: &mut crate::physical::CollectionContract,
1338    operations: &[AlterOperation],
1339) {
1340    if contract.table_def.is_none() {
1341        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1342    }
1343    for operation in operations {
1344        match operation {
1345            AlterOperation::AddColumn(column) => {
1346                if !contract
1347                    .declared_columns
1348                    .iter()
1349                    .any(|existing| existing.name == column.name)
1350                {
1351                    contract
1352                        .declared_columns
1353                        .push(declared_column_contract_from_ddl(column));
1354                }
1355                if let Some(table_def) = contract.table_def.as_mut() {
1356                    if table_def.get_column(&column.name).is_none() {
1357                        if let Ok(column_def) = column_def_from_ddl(column) {
1358                            if column.primary_key {
1359                                table_def.primary_key.push(column.name.clone());
1360                                table_def.constraints.push(
1361                                    crate::storage::schema::Constraint::new(
1362                                        format!("pk_{}", column.name),
1363                                        crate::storage::schema::ConstraintType::PrimaryKey,
1364                                    )
1365                                    .on_columns(vec![column.name.clone()]),
1366                                );
1367                            }
1368                            if column.unique {
1369                                table_def.constraints.push(
1370                                    crate::storage::schema::Constraint::new(
1371                                        format!("uniq_{}", column.name),
1372                                        crate::storage::schema::ConstraintType::Unique,
1373                                    )
1374                                    .on_columns(vec![column.name.clone()]),
1375                                );
1376                            }
1377                            if column.not_null {
1378                                table_def.constraints.push(
1379                                    crate::storage::schema::Constraint::new(
1380                                        format!("not_null_{}", column.name),
1381                                        crate::storage::schema::ConstraintType::NotNull,
1382                                    )
1383                                    .on_columns(vec![column.name.clone()]),
1384                                );
1385                            }
1386                            table_def.columns.push(column_def);
1387                        }
1388                    }
1389                }
1390            }
1391            AlterOperation::DropColumn(name) => {
1392                contract
1393                    .declared_columns
1394                    .retain(|column| column.name != *name);
1395                if let Some(table_def) = contract.table_def.as_mut() {
1396                    if let Some(index) = table_def.column_index(name) {
1397                        table_def.columns.remove(index);
1398                    }
1399                    table_def.primary_key.retain(|column| column != name);
1400                    table_def.constraints.retain(|constraint| {
1401                        !constraint.columns.iter().any(|column| column == name)
1402                    });
1403                    table_def
1404                        .indexes
1405                        .retain(|index| !index.columns.iter().any(|column| column == name));
1406                }
1407            }
1408            AlterOperation::RenameColumn { from, to } => {
1409                if contract
1410                    .declared_columns
1411                    .iter()
1412                    .any(|column| column.name == *to)
1413                {
1414                    continue;
1415                }
1416                if let Some(column) = contract
1417                    .declared_columns
1418                    .iter_mut()
1419                    .find(|column| column.name == *from)
1420                {
1421                    column.name = to.clone();
1422                }
1423                if let Some(table_def) = contract.table_def.as_mut() {
1424                    if let Some(column) = table_def
1425                        .columns
1426                        .iter_mut()
1427                        .find(|column| column.name == *from)
1428                    {
1429                        column.name = to.clone();
1430                    }
1431                    for primary_key in &mut table_def.primary_key {
1432                        if *primary_key == *from {
1433                            *primary_key = to.clone();
1434                        }
1435                    }
1436                    for constraint in &mut table_def.constraints {
1437                        for column in &mut constraint.columns {
1438                            if *column == *from {
1439                                *column = to.clone();
1440                            }
1441                        }
1442                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1443                            for column in ref_columns {
1444                                if *column == *from {
1445                                    *column = to.clone();
1446                                }
1447                            }
1448                        }
1449                    }
1450                    for index in &mut table_def.indexes {
1451                        for column in &mut index.columns {
1452                            if *column == *from {
1453                                *column = to.clone();
1454                            }
1455                        }
1456                    }
1457                }
1458            }
1459            // Partition ops don't touch the column contract — metadata is
1460            // persisted separately via `red_config.partition.*`.
1461            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1462            // RLS toggles don't touch the column contract — flag is persisted
1463            // separately via `red_config.rls.enabled.{table}` and enforced
1464            // through the in-memory `rls_enabled_tables` set.
1465            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1466            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1467            // and are enforced through `tenant_tables` + RLS auto-policy.
1468            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1469            AlterOperation::SetAppendOnly(on) => {
1470                contract.append_only = *on;
1471            }
1472            // VCS opt-in is persisted to red_vcs_settings by the
1473            // executor, not the contract — nothing to do here.
1474            AlterOperation::SetVersioned(_) => {}
1475            AlterOperation::EnableEvents(subscription) => {
1476                let mut subscription = subscription.clone();
1477                subscription.source = contract.name.clone();
1478                subscription.enabled = true;
1479                if let Some(existing) = contract
1480                    .subscriptions
1481                    .iter_mut()
1482                    .find(|existing| existing.target_queue == subscription.target_queue)
1483                {
1484                    *existing = subscription;
1485                } else {
1486                    contract.subscriptions.push(subscription);
1487                }
1488            }
1489            AlterOperation::DisableEvents => {
1490                for subscription in &mut contract.subscriptions {
1491                    subscription.enabled = false;
1492                }
1493            }
1494            AlterOperation::AddSubscription { name, descriptor } => {
1495                let mut sub = descriptor.clone();
1496                sub.name = name.clone();
1497                sub.source = contract.name.clone();
1498                sub.enabled = true;
1499                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1500                {
1501                    *existing = sub;
1502                } else {
1503                    contract.subscriptions.push(sub);
1504                }
1505            }
1506            AlterOperation::DropSubscription { name } => {
1507                contract.subscriptions.retain(|s| s.name != *name);
1508            }
1509        }
1510    }
1511}
1512
1513fn validate_event_subscriptions(
1514    runtime: &RedDBRuntime,
1515    source: &str,
1516    subscriptions: &[crate::catalog::SubscriptionDescriptor],
1517) -> RedDBResult<()> {
1518    for subscription in subscriptions
1519        .iter()
1520        .filter(|subscription| subscription.enabled)
1521    {
1522        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1523            return Err(RedDBError::Query(
1524                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1525            ));
1526        }
1527        validate_subscription_auth(runtime, source, subscription)?;
1528        if subscription.target_queue == source
1529            || subscription_would_create_cycle(
1530                &runtime.inner.db,
1531                source,
1532                &subscription.target_queue,
1533            )
1534        {
1535            return Err(RedDBError::Query(
1536                "subscription would create cycle".to_string(),
1537            ));
1538        }
1539        audit_subscription_redact_gap(runtime, source, subscription);
1540    }
1541    Ok(())
1542}
1543
1544fn validate_subscription_auth(
1545    runtime: &RedDBRuntime,
1546    source: &str,
1547    subscription: &crate::catalog::SubscriptionDescriptor,
1548) -> RedDBResult<()> {
1549    let auth_store = match runtime.inner.auth_store.read().clone() {
1550        Some(store) => store,
1551        None => return Ok(()),
1552    };
1553    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1554        Some(identity) => identity,
1555        None => return Ok(()),
1556    };
1557    let tenant = crate::runtime::impl_core::current_tenant();
1558    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1559
1560    if auth_store.iam_authorization_enabled() {
1561        let ctx = crate::auth::policies::EvalContext {
1562            principal_tenant: tenant.clone(),
1563            current_tenant: tenant.clone(),
1564            peer_ip: None,
1565            mfa_present: false,
1566            now_ms: crate::auth::now_ms(),
1567            principal_is_admin_role: role == crate::auth::Role::Admin,
1568        };
1569        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
1570        if let Some(t) = tenant.as_deref() {
1571            source_resource = source_resource.with_tenant(t.to_string());
1572        }
1573        if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
1574            return Err(RedDBError::Query(format!(
1575                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
1576                principal, source_resource.kind, source_resource.name
1577            )));
1578        }
1579
1580        let mut target_resource =
1581            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
1582        if let Some(t) = tenant.as_deref() {
1583            target_resource = target_resource.with_tenant(t.to_string());
1584        }
1585        if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
1586            return Err(RedDBError::Query(format!(
1587                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
1588                principal, target_resource.kind, target_resource.name
1589            )));
1590        }
1591        return Ok(());
1592    }
1593
1594    let ctx = crate::auth::privileges::AuthzContext {
1595        principal: &username,
1596        effective_role: role,
1597        tenant: tenant.as_deref(),
1598    };
1599    auth_store
1600        .check_grant(
1601            &ctx,
1602            crate::auth::privileges::Action::Select,
1603            &crate::auth::privileges::Resource::table_from_name(source),
1604        )
1605        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1606    auth_store
1607        .check_grant(
1608            &ctx,
1609            crate::auth::privileges::Action::Insert,
1610            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
1611        )
1612        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
1613    Ok(())
1614}
1615
1616fn audit_subscription_redact_gap(
1617    runtime: &RedDBRuntime,
1618    source: &str,
1619    subscription: &crate::catalog::SubscriptionDescriptor,
1620) {
1621    let auth_store = match runtime.inner.auth_store.read().clone() {
1622        Some(store) if store.iam_authorization_enabled() => store,
1623        _ => return,
1624    };
1625    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
1626        Some(identity) => identity,
1627        None => return,
1628    };
1629    let tenant = crate::runtime::impl_core::current_tenant();
1630    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1631    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
1632    if missing.is_empty() {
1633        return;
1634    }
1635
1636    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
1637    tracing::warn!(
1638        target: "reddb::operator",
1639        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
1640        source,
1641        subscription.target_queue,
1642        columns
1643    );
1644    let mut event = AuditEvent::builder("subscription_redact_gap")
1645        .principal(username)
1646        .source(AuditAuthSource::System)
1647        .resource(format!(
1648            "subscription:{}->{}",
1649            source, subscription.target_queue
1650        ))
1651        .outcome(Outcome::Success)
1652        .field(AuditFieldEscaper::field("source", source))
1653        .field(AuditFieldEscaper::field(
1654            "target_queue",
1655            subscription.target_queue.clone(),
1656        ))
1657        .field(AuditFieldEscaper::field(
1658            "subscription",
1659            subscription.name.clone(),
1660        ))
1661        .field(AuditFieldEscaper::field("columns", columns))
1662        .field(AuditFieldEscaper::field("role", role.as_str()));
1663    if let Some(t) = tenant {
1664        event = event.tenant(t);
1665    }
1666    runtime.inner.audit_log.record_event(event.build());
1667}
1668
1669fn subscription_redact_gap_columns(
1670    auth_store: &crate::auth::store::AuthStore,
1671    principal: &crate::auth::UserId,
1672    source: &str,
1673    subscription: &crate::catalog::SubscriptionDescriptor,
1674) -> BTreeSet<String> {
1675    let redacted: HashSet<String> = subscription
1676        .redact_fields
1677        .iter()
1678        .map(|field| field.to_ascii_lowercase())
1679        .collect();
1680    auth_store
1681        .effective_policies(principal)
1682        .iter()
1683        .flat_map(|policy| policy.statements.iter())
1684        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
1685        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
1686        .flat_map(|statement| statement.resources.iter())
1687        .filter_map(|resource| denied_column_for_source(resource, source))
1688        .filter(|column| !redact_covers_column(&redacted, source, column))
1689        .collect()
1690}
1691
1692fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
1693    match pattern {
1694        crate::auth::policies::ActionPattern::Wildcard => true,
1695        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
1696        crate::auth::policies::ActionPattern::Prefix(prefix) => {
1697            "select".len() > prefix.len() + 1
1698                && "select".starts_with(prefix)
1699                && "select".as_bytes()[prefix.len()] == b':'
1700        }
1701    }
1702}
1703
1704fn denied_column_for_source(
1705    resource: &crate::auth::policies::ResourcePattern,
1706    source: &str,
1707) -> Option<String> {
1708    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
1709        return None;
1710    };
1711    if kind != "column" {
1712        return None;
1713    }
1714    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
1715    (column.table_resource_name() == source).then_some(column.column)
1716}
1717
1718fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
1719    let column = column.to_ascii_lowercase();
1720    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
1721    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
1722}
1723
1724fn subscription_would_create_cycle(
1725    db: &crate::storage::unified::devx::RedDB,
1726    source: &str,
1727    target: &str,
1728) -> bool {
1729    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
1730    for contract in db.collection_contracts() {
1731        for subscription in contract
1732            .subscriptions
1733            .into_iter()
1734            .filter(|subscription| subscription.enabled)
1735        {
1736            graph
1737                .entry(subscription.source)
1738                .or_default()
1739                .push(subscription.target_queue);
1740        }
1741    }
1742    graph
1743        .entry(source.to_string())
1744        .or_default()
1745        .push(target.to_string());
1746
1747    let mut stack = vec![target.to_string()];
1748    let mut seen = HashSet::new();
1749    while let Some(node) = stack.pop() {
1750        if node == source {
1751            return true;
1752        }
1753        if !seen.insert(node.clone()) {
1754            continue;
1755        }
1756        if let Some(next) = graph.get(&node) {
1757            stack.extend(next.iter().cloned());
1758        }
1759    }
1760    false
1761}
1762
1763pub(crate) fn ensure_event_target_queue_pub(
1764    runtime: &RedDBRuntime,
1765    queue: &str,
1766) -> RedDBResult<()> {
1767    ensure_event_target_queue(runtime, queue)
1768}
1769
1770fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
1771    let store = runtime.inner.db.store();
1772    if store.get_collection(queue).is_some() {
1773        return Ok(());
1774    }
1775    store
1776        .create_collection(queue)
1777        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1778    runtime
1779        .inner
1780        .db
1781        .save_collection_contract(event_queue_collection_contract(queue))
1782        .map_err(|err| RedDBError::Internal(err.to_string()))?;
1783    store.set_config_tree(
1784        &format!("queue.{queue}.mode"),
1785        &crate::serde_json::Value::String("fanout".to_string()),
1786    );
1787    Ok(())
1788}
1789
1790fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
1791    let now = current_unix_ms();
1792    crate::physical::CollectionContract {
1793        name: queue.to_string(),
1794        declared_model: crate::catalog::CollectionModel::Queue,
1795        schema_mode: crate::catalog::SchemaMode::Dynamic,
1796        origin: crate::physical::ContractOrigin::Implicit,
1797        version: 1,
1798        created_at_unix_ms: now,
1799        updated_at_unix_ms: now,
1800        default_ttl_ms: None,
1801        context_index_fields: Vec::new(),
1802        declared_columns: Vec::new(),
1803        table_def: None,
1804        timestamps_enabled: false,
1805        context_index_enabled: false,
1806        append_only: true,
1807        subscriptions: Vec::new(),
1808    }
1809}
1810
1811fn build_table_def_from_create_table(
1812    query: &CreateTableQuery,
1813) -> RedDBResult<crate::storage::schema::TableDef> {
1814    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
1815    for column in &query.columns {
1816        if column.primary_key {
1817            table.primary_key.push(column.name.clone());
1818            table.constraints.push(
1819                crate::storage::schema::Constraint::new(
1820                    format!("pk_{}", column.name),
1821                    crate::storage::schema::ConstraintType::PrimaryKey,
1822                )
1823                .on_columns(vec![column.name.clone()]),
1824            );
1825        }
1826        if column.unique {
1827            table.constraints.push(
1828                crate::storage::schema::Constraint::new(
1829                    format!("uniq_{}", column.name),
1830                    crate::storage::schema::ConstraintType::Unique,
1831                )
1832                .on_columns(vec![column.name.clone()]),
1833            );
1834        }
1835        if column.not_null {
1836            table.constraints.push(
1837                crate::storage::schema::Constraint::new(
1838                    format!("not_null_{}", column.name),
1839                    crate::storage::schema::ConstraintType::NotNull,
1840                )
1841                .on_columns(vec![column.name.clone()]),
1842            );
1843        }
1844        table.columns.push(column_def_from_ddl(column)?);
1845    }
1846    // WITH timestamps = true: append the two runtime-managed columns
1847    // to the schema so resolved_contract_columns exposes them to the
1848    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
1849    // not-nullable; the write path auto-fills them.
1850    if query.timestamps {
1851        table.columns.push(
1852            crate::storage::schema::ColumnDef::new(
1853                "created_at".to_string(),
1854                crate::storage::schema::DataType::UnsignedInteger,
1855            )
1856            .not_null(),
1857        );
1858        table.columns.push(
1859            crate::storage::schema::ColumnDef::new(
1860                "updated_at".to_string(),
1861                crate::storage::schema::DataType::UnsignedInteger,
1862            )
1863            .not_null(),
1864        );
1865        table.constraints.push(
1866            crate::storage::schema::Constraint::new(
1867                "not_null_created_at".to_string(),
1868                crate::storage::schema::ConstraintType::NotNull,
1869            )
1870            .on_columns(vec!["created_at".to_string()]),
1871        );
1872        table.constraints.push(
1873            crate::storage::schema::Constraint::new(
1874                "not_null_updated_at".to_string(),
1875                crate::storage::schema::ConstraintType::NotNull,
1876            )
1877            .on_columns(vec!["updated_at".to_string()]),
1878        );
1879    }
1880    table
1881        .validate()
1882        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
1883    Ok(table)
1884}
1885
1886fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
1887    let data_type = resolve_declared_data_type(&column.data_type)
1888        .map_err(|err| RedDBError::Query(err.to_string()))?;
1889    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
1890    if column.not_null {
1891        column_def = column_def.not_null();
1892    }
1893    if let Some(default) = &column.default {
1894        column_def = column_def.with_default(default.as_bytes().to_vec());
1895    }
1896    if column.compress.unwrap_or(0) > 0 {
1897        column_def = column_def.compressed();
1898    }
1899    if !column.enum_variants.is_empty() {
1900        column_def = column_def.with_variants(column.enum_variants.clone());
1901    }
1902    if let Some(precision) = column.decimal_precision {
1903        column_def = column_def.with_precision(precision);
1904    }
1905    if let Some(element_type) = &column.array_element {
1906        column_def = column_def.with_element_type(
1907            resolve_declared_data_type(element_type)
1908                .map_err(|err| RedDBError::Query(err.to_string()))?,
1909        );
1910    }
1911    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
1912    if column.unique {
1913        column_def = column_def.with_metadata("unique", "true");
1914    }
1915    if column.primary_key {
1916        column_def = column_def.with_metadata("primary_key", "true");
1917    }
1918    Ok(column_def)
1919}
1920
1921fn current_unix_ms() -> u128 {
1922    std::time::SystemTime::now()
1923        .duration_since(std::time::UNIX_EPOCH)
1924        .unwrap_or_default()
1925        .as_millis()
1926}
1927
1928#[cfg(test)]
1929mod tests {
1930    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
1931    use crate::auth::store::{AuthStore, PrincipalRef};
1932    use crate::auth::UserId;
1933    use crate::auth::{AuthConfig, Role};
1934    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
1935    use crate::storage::schema::Value;
1936    use crate::{RedDBOptions, RedDBRuntime};
1937    use std::sync::Arc;
1938
1939    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
1940        Policy {
1941            id: id.to_string(),
1942            version: 1,
1943            tenant: None,
1944            created_at: 0,
1945            updated_at: 0,
1946            statements: vec![Statement {
1947                sid: None,
1948                effect: Effect::Allow,
1949                actions: vec![ActionPattern::Exact(action.to_string())],
1950                resources: vec![ResourcePattern::Exact {
1951                    kind: "collection".to_string(),
1952                    name: collection.to_string(),
1953                }],
1954                condition: None,
1955            }],
1956        }
1957    }
1958
1959    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
1960        let store = Arc::new(AuthStore::new(AuthConfig::default()));
1961        *rt.inner.auth_store.write() = Some(store.clone());
1962        store
1963    }
1964
1965    #[test]
1966    fn drop_denied_without_iam_policy() {
1967        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
1968        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
1969        let store = wire_auth_store(&rt);
1970        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
1971        let select_only = Policy {
1972            id: "select-only".to_string(),
1973            version: 1,
1974            tenant: None,
1975            created_at: 0,
1976            updated_at: 0,
1977            statements: vec![Statement {
1978                sid: None,
1979                effect: Effect::Allow,
1980                actions: vec![ActionPattern::Exact("select".to_string())],
1981                resources: vec![ResourcePattern::Wildcard],
1982                condition: None,
1983            }],
1984        };
1985        store.put_policy_internal(select_only).unwrap();
1986        let alice = UserId::from_parts(None, "alice");
1987        store
1988            .attach_policy(PrincipalRef::User(alice), "select-only")
1989            .unwrap();
1990        set_current_auth_identity("alice".to_string(), Role::Write);
1991        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
1992        clear_current_auth_identity();
1993        assert!(
1994            format!("{err}").contains("denied by IAM policy"),
1995            "got: {err}"
1996        );
1997    }
1998
1999    #[test]
2000    fn drop_allowed_with_explicit_iam_policy() {
2001        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2002        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2003        let store = wire_auth_store(&rt);
2004        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2005        store.put_policy_internal(policy).unwrap();
2006        let bob = UserId::from_parts(None, "bob");
2007        store
2008            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2009            .unwrap();
2010        set_current_auth_identity("bob".to_string(), Role::Write);
2011        rt.execute_query("DROP TABLE bar").unwrap();
2012        clear_current_auth_identity();
2013    }
2014
2015    #[test]
2016    fn drop_allowed_with_wildcard_iam_policy() {
2017        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2018        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2019        let store = wire_auth_store(&rt);
2020        let policy = Policy {
2021            id: "allow-drop-all".to_string(),
2022            version: 1,
2023            tenant: None,
2024            created_at: 0,
2025            updated_at: 0,
2026            statements: vec![Statement {
2027                sid: None,
2028                effect: Effect::Allow,
2029                actions: vec![ActionPattern::Exact("drop".to_string())],
2030                resources: vec![ResourcePattern::Wildcard],
2031                condition: None,
2032            }],
2033        };
2034        store.put_policy_internal(policy).unwrap();
2035        let carl = UserId::from_parts(None, "carl");
2036        store
2037            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2038            .unwrap();
2039        set_current_auth_identity("carl".to_string(), Role::Write);
2040        rt.execute_query("DROP TABLE baz").unwrap();
2041        clear_current_auth_identity();
2042    }
2043
2044    #[test]
2045    fn truncate_denied_without_iam_policy() {
2046        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2047        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2048        let store = wire_auth_store(&rt);
2049        // A policy exists (IAM active) but gives no truncate right.
2050        let select_only = Policy {
2051            id: "select-only-2".to_string(),
2052            version: 1,
2053            tenant: None,
2054            created_at: 0,
2055            updated_at: 0,
2056            statements: vec![Statement {
2057                sid: None,
2058                effect: Effect::Allow,
2059                actions: vec![ActionPattern::Exact("select".to_string())],
2060                resources: vec![ResourcePattern::Wildcard],
2061                condition: None,
2062            }],
2063        };
2064        store.put_policy_internal(select_only).unwrap();
2065        let dana = UserId::from_parts(None, "dana");
2066        store
2067            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2068            .unwrap();
2069        set_current_auth_identity("dana".to_string(), Role::Write);
2070        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2071        clear_current_auth_identity();
2072        assert!(
2073            format!("{err}").contains("denied by IAM policy"),
2074            "got: {err}"
2075        );
2076    }
2077
2078    #[test]
2079    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2080        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2081        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2082            .unwrap();
2083        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2084            .unwrap();
2085        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2086            .unwrap();
2087
2088        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2089        assert_eq!(truncated.statement_type, "truncate");
2090        assert_eq!(truncated.affected_rows, 0);
2091
2092        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2093        assert!(empty.result.records.is_empty());
2094
2095        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2096            .unwrap();
2097        let selected = rt
2098            .execute_query("SELECT name FROM users WHERE id = 3")
2099            .unwrap();
2100        let name = selected.result.records[0].get("name").unwrap();
2101        assert_eq!(name, &Value::text("cy"));
2102        assert!(rt.db().collection_contract("users").is_some());
2103        assert!(rt
2104            .inner
2105            .index_store
2106            .list_indices("users")
2107            .iter()
2108            .any(|index| index.name == "idx_users_id"));
2109    }
2110
2111    #[test]
2112    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2113        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2114        rt.execute_query("CREATE QUEUE tasks").unwrap();
2115        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2116
2117        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2118        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2119
2120        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2121        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2122        assert_eq!(
2123            len.result.records[0].get("len"),
2124            Some(&Value::UnsignedInteger(0))
2125        );
2126    }
2127
2128    #[test]
2129    fn truncate_system_schema_is_read_only() {
2130        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2131        let err = rt
2132            .execute_query("TRUNCATE COLLECTION red.collections")
2133            .unwrap_err();
2134        assert!(format!("{err}").contains("system schema is read-only"));
2135    }
2136
2137    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2138
2139    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2140        let result = rt
2141            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2142            .expect("peek queue");
2143        result
2144            .result
2145            .records
2146            .iter()
2147            .map(
2148                |record| match record.get("payload").expect("payload column") {
2149                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2150                    other => panic!("expected JSON queue payload, got {other:?}"),
2151                },
2152            )
2153            .collect()
2154    }
2155
2156    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2157    /// `truncate` event, not one delete event per row.
2158    #[test]
2159    fn truncate_event_enabled_table_emits_single_truncate_event() {
2160        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2161        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2162            .unwrap();
2163        rt.execute_query(
2164            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2165        )
2166        .unwrap();
2167
2168        // Drain the 3 insert events so we start clean.
2169        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2170
2171        rt.execute_query("TRUNCATE TABLE users").unwrap();
2172
2173        let events = queue_payloads(&rt, "users_events");
2174        // Must be exactly 1 truncate event, not 3 delete events.
2175        assert_eq!(
2176            events.len(),
2177            1,
2178            "expected 1 truncate event, got {}",
2179            events.len()
2180        );
2181        let ev = events[0].as_object().expect("event is object");
2182        assert_eq!(
2183            ev.get("op").and_then(crate::json::Value::as_str),
2184            Some("truncate")
2185        );
2186        assert_eq!(
2187            ev.get("collection").and_then(crate::json::Value::as_str),
2188            Some("users")
2189        );
2190        assert_eq!(
2191            ev.get("entities_count")
2192                .and_then(crate::json::Value::as_u64),
2193            Some(3)
2194        );
2195        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2196        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2197        assert!(ev
2198            .get("event_id")
2199            .and_then(crate::json::Value::as_str)
2200            .is_some_and(|s| !s.is_empty()));
2201    }
2202
2203    /// `TRUNCATE users` on a collection without event subscription emits no events.
2204    #[test]
2205    fn truncate_no_events_collection_emits_nothing() {
2206        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2207        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2208            .unwrap();
2209        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2210            .unwrap();
2211        // No EVENTS subscription — truncate must work without touching any queue.
2212        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2213        // No crash, no queue to check. Just verify truncation happened.
2214        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2215        assert!(rows.result.records.is_empty());
2216    }
2217
2218    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2219    /// `collection_dropped` event. The subscription is removed from the
2220    /// source contract but the target queue is preserved for consumer drain.
2221    #[test]
2222    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2223        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2224        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2225            .unwrap();
2226        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2227            .unwrap();
2228
2229        // Drain insert events so we start clean.
2230        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2231
2232        rt.execute_query("DROP TABLE users").unwrap();
2233
2234        // Queue must still exist with 1 collection_dropped event.
2235        let events = queue_payloads(&rt, "users_events");
2236        assert_eq!(
2237            events.len(),
2238            1,
2239            "expected 1 collection_dropped event, got {}",
2240            events.len()
2241        );
2242        let ev = events[0].as_object().expect("event is object");
2243        assert_eq!(
2244            ev.get("op").and_then(crate::json::Value::as_str),
2245            Some("collection_dropped")
2246        );
2247        assert_eq!(
2248            ev.get("collection").and_then(crate::json::Value::as_str),
2249            Some("users")
2250        );
2251        assert_eq!(
2252            ev.get("final_entities_count")
2253                .and_then(crate::json::Value::as_u64),
2254            Some(2)
2255        );
2256        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2257        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2258        assert!(ev
2259            .get("event_id")
2260            .and_then(crate::json::Value::as_str)
2261            .is_some_and(|s| !s.is_empty()));
2262
2263        // Source collection is gone.
2264        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2265        assert!(
2266            format!("{err}").contains("users"),
2267            "expected not-found error"
2268        );
2269    }
2270
2271    /// `DROP TABLE users` on a collection without event subscription works
2272    /// normally with no event emitted.
2273    #[test]
2274    fn drop_no_events_collection_emits_nothing() {
2275        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2276        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2277            .unwrap();
2278        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2279            .unwrap();
2280        rt.execute_query("DROP TABLE plain").unwrap();
2281        // No crash and collection is gone.
2282        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2283        assert!(format!("{err}").contains("plain"));
2284    }
2285
2286    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2287
2288    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2289    #[test]
2290    fn ops_filter_insert_only_ignores_update_and_delete() {
2291        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2292        rt.execute_query(
2293            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2294        )
2295        .unwrap();
2296        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2297            .unwrap();
2298        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2299            .unwrap();
2300        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2301
2302        let events = queue_payloads(&rt, "items_events");
2303        // Only the INSERT should have fired.
2304        assert_eq!(
2305            events.len(),
2306            1,
2307            "expected 1 insert event, got {}",
2308            events.len()
2309        );
2310        assert_eq!(
2311            events[0]
2312                .as_object()
2313                .unwrap()
2314                .get("op")
2315                .and_then(crate::json::Value::as_str),
2316            Some("insert")
2317        );
2318    }
2319
2320    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2321    #[test]
2322    fn where_filter_skips_rows_that_do_not_match() {
2323        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2324        rt.execute_query(
2325            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2326        )
2327        .unwrap();
2328
2329        // This row should generate an event.
2330        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2331            .unwrap();
2332        // This row should NOT generate an event.
2333        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2334            .unwrap();
2335
2336        let events = queue_payloads(&rt, "users_events");
2337        assert_eq!(
2338            events.len(),
2339            1,
2340            "expected 1 event (only active), got {}",
2341            events.len()
2342        );
2343        let ev = events[0].as_object().unwrap();
2344        assert_eq!(
2345            ev.get("op").and_then(crate::json::Value::as_str),
2346            Some("insert")
2347        );
2348        let after = ev.get("after").unwrap().as_object().unwrap();
2349        assert_eq!(
2350            after.get("status").and_then(crate::json::Value::as_str),
2351            Some("active")
2352        );
2353    }
2354
2355    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2356    #[test]
2357    fn ops_filter_and_where_filter_combined() {
2358        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2359        rt.execute_query(
2360            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2361        )
2362        .unwrap();
2363
2364        // INSERT active → event
2365        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2366            .unwrap();
2367        // INSERT inactive → no event
2368        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2369            .unwrap();
2370        // UPDATE row 1 to inactive → after = inactive, no event
2371        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2372            .unwrap();
2373        // DELETE → ops_filter excludes it
2374        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2375
2376        let events = queue_payloads(&rt, "items_events");
2377        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2378        assert_eq!(
2379            events.len(),
2380            1,
2381            "expected 1 event, got {}: {events:?}",
2382            events.len()
2383        );
2384        assert_eq!(
2385            events[0]
2386                .as_object()
2387                .unwrap()
2388                .get("op")
2389                .and_then(crate::json::Value::as_str),
2390            Some("insert")
2391        );
2392    }
2393
2394    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2395    #[test]
2396    fn where_filter_on_delete_checks_before_state() {
2397        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2398        rt.execute_query(
2399            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2400        )
2401        .unwrap();
2402
2403        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2404            .unwrap();
2405
2406        // Delete active row → event (before-state was active)
2407        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2408        // Delete inactive row → no event (before-state was inactive)
2409        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2410
2411        let events = queue_payloads(&rt, "users_events");
2412        assert_eq!(
2413            events.len(),
2414            1,
2415            "expected 1 delete event, got {}",
2416            events.len()
2417        );
2418        let ev = events[0].as_object().unwrap();
2419        assert_eq!(
2420            ev.get("op").and_then(crate::json::Value::as_str),
2421            Some("delete")
2422        );
2423    }
2424
2425    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2426
2427    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2428    #[test]
2429    fn alter_add_column_on_event_enabled_table_succeeds() {
2430        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2431        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2432            .unwrap();
2433        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2434        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2435            .unwrap();
2436        // The column is now in the contract.
2437        let contract = rt.db().collection_contract("users").unwrap();
2438        assert!(
2439            contract.declared_columns.iter().any(|c| c.name == "phone"),
2440            "phone column should be in contract"
2441        );
2442        // Subscription still enabled after the alter.
2443        assert!(
2444            contract.subscriptions.iter().any(|s| s.enabled),
2445            "subscription should remain enabled"
2446        );
2447    }
2448
2449    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2450    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2451    #[test]
2452    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2453        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2454        rt.execute_query(
2455            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2456        )
2457        .unwrap();
2458        // DROP COLUMN — schema change event path exercises, must not error.
2459        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2460            .unwrap();
2461        let contract = rt.db().collection_contract("items").unwrap();
2462        assert!(
2463            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2464            "secret column should be removed"
2465        );
2466        // ENABLE RLS — non-column op, no schema-change event (coverage).
2467        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2468            .unwrap();
2469        // Collection and subscription still intact.
2470        assert!(
2471            contract.subscriptions.iter().any(|s| s.enabled),
2472            "subscription should remain enabled"
2473        );
2474    }
2475}