Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(
265                &query.name,
266                query.collection_model,
267                query.analytics_config.clone(),
268            )
269        };
270        self.inner
271            .db
272            .save_collection_contract(contract)
273            .map_err(|err| RedDBError::Internal(err.to_string()))?;
274        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275            store.set_config_tree(
276                &format!("red.collection_tenants.{}", query.name),
277                &crate::serde_json::Value::String(tenant_id),
278            );
279        }
280        self.inner
281            .db
282            .persist_metadata()
283            .map_err(|err| RedDBError::Internal(err.to_string()))?;
284        self.invalidate_result_cache();
285
286        Ok(RuntimeQueryResult::ok_message(
287            raw_query.to_string(),
288            &format!("{label} '{}' created", query.name),
289            "create",
290        ))
291    }
292
293    /// Bootstrap a system-owned graph collection declared `WITH ANALYTICS`
294    /// (issue #803). The SQL DDL path refuses `red.*` names via
295    /// [`is_system_schema_name`], so built-ins like `red.topology.cluster`
296    /// cannot be created through `CREATE GRAPH`; this method drives the same
297    /// contract-build / save / persist path directly, bypassing only that
298    /// guard. Idempotent: a no-op when the collection already exists (its
299    /// analytics contract is recovered from the WAL on restart), so it is safe
300    /// to call unconditionally on every boot.
301    pub fn ensure_system_graph_with_analytics(
302        &self,
303        name: &str,
304        outputs: &[crate::catalog::AnalyticsOutput],
305    ) -> RedDBResult<()> {
306        let store = self.inner.db.store();
307        if store.get_collection(name).is_some() {
308            return Ok(());
309        }
310        let analytics_config = outputs
311            .iter()
312            .map(|output| crate::catalog::AnalyticsViewDescriptor {
313                output: *output,
314                algorithm: None,
315                resolution: None,
316                max_iterations: None,
317                tolerance: None,
318            })
319            .collect();
320        store
321            .create_collection(name)
322            .map_err(|err| RedDBError::Internal(err.to_string()))?;
323        let contract = keyed_collection_contract(
324            name,
325            crate::catalog::CollectionModel::Graph,
326            analytics_config,
327        );
328        self.inner
329            .db
330            .save_collection_contract(contract)
331            .map_err(|err| RedDBError::Internal(err.to_string()))?;
332        self.inner
333            .db
334            .persist_metadata()
335            .map_err(|err| RedDBError::Internal(err.to_string()))?;
336        self.invalidate_result_cache_process_only();
337        Ok(())
338    }
339
340    pub fn execute_create_collection(
341        &self,
342        raw_query: &str,
343        query: &CreateCollectionQuery,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        let model = match query.kind.as_str() {
346            "graph" => CollectionModel::Graph,
347            "document" => CollectionModel::Document,
348            "metrics" => CollectionModel::Metrics,
349            "vector.turbo" => {
350                let dimension = query.vector_dimension.ok_or_else(|| {
351                    RedDBError::Query(
352                        "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353                    )
354                })?;
355                let create = CreateVectorQuery {
356                    name: query.name.clone(),
357                    dimension,
358                    metric: query
359                        .vector_metric
360                        .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361                    if_not_exists: query.if_not_exists,
362                };
363                let result = self.execute_create_vector(raw_query, &create)?;
364                // Issue #693 — durable kind marker that distinguishes
365                // this collection from the legacy `vector` path on
366                // every restart. Runtime state (TurboQuantIndex +
367                // TurboExtent) is lazily materialised by
368                // `RedDB::turbo_state` on first INSERT/SEARCH so the
369                // marker is the only thing that needs to survive.
370                let store = self.inner.db.store();
371                crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372                self.inner
373                    .db
374                    .persist_metadata()
375                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
376                // Eagerly construct the state so the TurboExtent (if
377                // any) is reserved at creation time rather than on
378                // first INSERT. Errors are swallowed: the lazy path in
379                // `turbo_state` will retry on the next access.
380                let _ = self.inner.db.turbo_state(&query.name);
381                return Ok(result);
382            }
383            // KIND blockchain — issue #523 foundation: stored on top of a
384            // Table-shaped collection. The `chain` marker + reserved-column
385            // discipline make the difference. Schema validation, conflict
386            // retries, and `verify_chain` come in later iterations.
387            "blockchain" => CollectionModel::Table,
388            other => {
389                return Err(RedDBError::Query(format!(
390                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391                )));
392            }
393        };
394        let create = CreateTableQuery {
395            collection_model: model,
396            name: query.name.clone(),
397            columns: Vec::new(),
398            if_not_exists: query.if_not_exists,
399            default_ttl_ms: None,
400            metrics_rollup_policies: Vec::new(),
401            context_index_fields: Vec::new(),
402            context_index_enabled: false,
403            timestamps: false,
404            partition_by: None,
405            tenant_by: None,
406            append_only: false,
407            subscriptions: Vec::new(),
408            analytics_config: Vec::new(),
409            vault_own_master_key: false,
410        };
411        let result = self.execute_create_table(raw_query, &create)?;
412        if query.kind == "blockchain" {
413            self.install_blockchain_kind(&query.name)?;
414        }
415        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
416        // already produces a validated 32-byte pubkey list; installing
417        // the registry stamps the per-collection signer set into
418        // `red_config` so the INSERT path can load it cheaply.
419        if !query.allowed_signers.is_empty() {
420            let actor = crate::runtime::impl_core::current_user_projected()
421                .unwrap_or_else(|| "@system/create-collection".to_string());
422            crate::runtime::signed_writes_kind::install(
423                &self.inner.db.store(),
424                &query.name,
425                &query.allowed_signers,
426                &actor,
427            );
428        }
429        Ok(result)
430    }
431
432    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
433    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
434    /// has a row at height 0 we leave it.
435    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436        use crate::runtime::blockchain_kind;
437        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438        use std::sync::Arc;
439
440        let store = self.inner.db.store();
441        blockchain_kind::mark_as_chain(&store, name);
442
443        let existing_tip = blockchain_kind::chain_tip(&store, name);
444        if existing_tip.height.is_some() {
445            return Ok(());
446        }
447
448        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450            fields.into_iter().collect();
451        let entity = UnifiedEntity::new(
452            EntityId::new(0),
453            EntityKind::TableRow {
454                table: Arc::from(name),
455                row_id: 0,
456            },
457            EntityData::Row(RowData {
458                columns: Vec::new(),
459                named: Some(named),
460                schema: None,
461            }),
462        );
463        store
464            .insert_auto(name, entity)
465            .map_err(|err| RedDBError::Internal(err.to_string()))?;
466        // #524: prime the in-memory tip cache so the chain-tip endpoint and
467        // subsequent INSERTs don't have to scan the collection to find genesis.
468        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469            self.inner
470                .chain_tip_cache
471                .lock()
472                .insert(name.to_string(), tip);
473        }
474        Ok(())
475    }
476
477    pub fn execute_create_vector(
478        &self,
479        raw_query: &str,
480        query: &CreateVectorQuery,
481    ) -> RedDBResult<RuntimeQueryResult> {
482        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483        if is_system_schema_name(&query.name) {
484            return Err(RedDBError::Query("system schema is read-only".to_string()));
485        }
486        let store = self.inner.db.store();
487        if store.get_collection(&query.name).is_some() {
488            if query.if_not_exists {
489                return Ok(RuntimeQueryResult::ok_message(
490                    raw_query.to_string(),
491                    &format!("vector '{}' already exists", query.name),
492                    "create",
493                ));
494            }
495            return Err(RedDBError::Query(format!(
496                "vector '{}' already exists",
497                query.name
498            )));
499        }
500
501        store
502            .create_collection(&query.name)
503            .map_err(|err| RedDBError::Internal(err.to_string()))?;
504        self.inner
505            .db
506            .save_collection_contract(vector_collection_contract(query))
507            .map_err(|err| RedDBError::Internal(err.to_string()))?;
508        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509            store.set_config_tree(
510                &format!("red.collection_tenants.{}", query.name),
511                &crate::serde_json::Value::String(tenant_id),
512            );
513        }
514        // Slice G (#675) — register `vector.turbo` as the built-in
515        // baseline for every newly-created vector collection. The
516        // marker is the durable signal `RedDB::turbo_state` reads to
517        // materialise the TurboQuantIndex + TurboExtent on first
518        // INSERT/SEARCH. Pre-existing vector collections were created
519        // without the marker and stay on the brute-force fallback path
520        // until they are explicitly migrated — there is no implicit
521        // backfill in this slice.
522        crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523        self.inner
524            .db
525            .persist_metadata()
526            .map_err(|err| RedDBError::Internal(err.to_string()))?;
527        // Eagerly materialise the per-collection turbo state so the
528        // TurboExtent (when the store is paged) is reserved at CREATE
529        // time rather than on the first INSERT. Failures are swallowed
530        // — the lazy path in `turbo_state` retries on next access.
531        let _ = self.inner.db.turbo_state(&query.name);
532        self.invalidate_result_cache();
533
534        Ok(RuntimeQueryResult::ok_message(
535            raw_query.to_string(),
536            &format!("vector '{}' created", query.name),
537            "create",
538        ))
539    }
540
541    fn provision_vault_key_material(
542        &self,
543        collection: &str,
544        own_master_key: bool,
545    ) -> RedDBResult<()> {
546        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548        })?;
549        if !auth_store.is_vault_backed() {
550            return Err(RedDBError::Query(
551                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552            ));
553        }
554
555        if auth_store.vault_secret_key().is_none() {
556            let key = crate::auth::store::random_bytes(32);
557            auth_store
558                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559                .map_err(|err| RedDBError::Query(err.to_string()))?;
560        }
561
562        if own_master_key {
563            let key = crate::auth::store::random_bytes(32);
564            auth_store
565                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566                .map_err(|err| RedDBError::Query(err.to_string()))?;
567        }
568
569        Ok(())
570    }
571
572    /// Execute DROP TABLE
573    ///
574    /// Drops the collection and all its data from the store.
575    pub fn execute_drop_table(
576        &self,
577        raw_query: &str,
578        query: &DropTableQuery,
579    ) -> RedDBResult<RuntimeQueryResult> {
580        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581        let store = self.inner.db.store();
582
583        if is_system_schema_name(&query.name) {
584            return Err(RedDBError::Query("system schema is read-only".to_string()));
585        }
586
587        let exists = store.get_collection(&query.name).is_some();
588        if !exists {
589            if query.if_exists {
590                return Ok(RuntimeQueryResult::ok_message(
591                    raw_query.to_string(),
592                    &format!("table '{}' does not exist", query.name),
593                    "drop",
594                ));
595            }
596            return Err(RedDBError::NotFound(format!(
597                "table '{}' not found",
598                query.name
599            )));
600        }
601        let actual =
602            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605        // Emit 1 collection_dropped event before storage is wiped.
606        // Queue is preserved; subscription is removed with the contract below.
607        let final_count = store
608            .get_collection(&query.name)
609            .map(|manager| manager.query_all(|_| true).len() as u64)
610            .unwrap_or(0);
611        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612            self,
613            &query.name,
614            final_count,
615        )?;
616
617        let orphaned_indices: Vec<String> = self
618            .inner
619            .index_store
620            .list_indices(&query.name)
621            .into_iter()
622            .map(|index| index.name)
623            .collect();
624        for name in &orphaned_indices {
625            self.inner.index_store.drop_index(name, &query.name);
626        }
627
628        store
629            .drop_collection(&query.name)
630            .map_err(|err| RedDBError::Internal(err.to_string()))?;
631        self.inner.db.invalidate_vector_index(&query.name);
632        self.inner.db.clear_collection_default_ttl_ms(&query.name);
633        self.inner
634            .db
635            .remove_collection_contract(&query.name)
636            .map_err(|err| RedDBError::Internal(err.to_string()))?;
637        self.clear_table_planner_stats(&query.name);
638        self.invalidate_result_cache();
639        // Issue #119: a dropped collection vanishes from every
640        // (tenant, role)'s visible-collections set. Auth is optional in
641        // embedded mode so guard the call.
642        if let Some(store) = self.inner.auth_store.read().clone() {
643            store.invalidate_visible_collections_cache();
644        }
645        self.inner
646            .db
647            .persist_metadata()
648            .map_err(|err| RedDBError::Internal(err.to_string()))?;
649        // Issue #120 — drop both the collection entries *and* every
650        // index entry that was scoped to this collection.  Dropping the
651        // collection wipes columns + collection-name + type-tags +
652        // index hits in one pass via `purge_collection_entries`, so
653        // the explicit `DropIndex` calls would be redundant.
654        self.schema_vocabulary_apply(
655            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656                collection: query.name.clone(),
657            },
658        );
659
660        Ok(RuntimeQueryResult::ok_message(
661            raw_query.to_string(),
662            &format!("table '{}' dropped", query.name),
663            "drop",
664        ))
665    }
666
667    pub fn execute_drop_graph(
668        &self,
669        raw_query: &str,
670        query: &DropGraphQuery,
671    ) -> RedDBResult<RuntimeQueryResult> {
672        self.execute_drop_typed_collection(
673            raw_query,
674            &query.name,
675            query.if_exists,
676            CollectionModel::Graph,
677            "graph",
678        )
679    }
680
681    pub fn execute_drop_vector(
682        &self,
683        raw_query: &str,
684        query: &DropVectorQuery,
685    ) -> RedDBResult<RuntimeQueryResult> {
686        self.execute_drop_typed_collection(
687            raw_query,
688            &query.name,
689            query.if_exists,
690            CollectionModel::Vector,
691            "vector",
692        )
693    }
694
695    pub fn execute_drop_document(
696        &self,
697        raw_query: &str,
698        query: &DropDocumentQuery,
699    ) -> RedDBResult<RuntimeQueryResult> {
700        self.execute_drop_typed_collection(
701            raw_query,
702            &query.name,
703            query.if_exists,
704            CollectionModel::Document,
705            "document",
706        )
707    }
708
709    pub fn execute_drop_kv(
710        &self,
711        raw_query: &str,
712        query: &DropKvQuery,
713    ) -> RedDBResult<RuntimeQueryResult> {
714        let label = polymorphic_resolver::model_name(query.model);
715        self.execute_drop_typed_collection(
716            raw_query,
717            &query.name,
718            query.if_exists,
719            query.model,
720            label,
721        )
722    }
723
724    pub fn execute_drop_collection(
725        &self,
726        raw_query: &str,
727        query: &DropCollectionQuery,
728    ) -> RedDBResult<RuntimeQueryResult> {
729        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730        if is_system_schema_name(&query.name) {
731            return Err(RedDBError::Query("system schema is read-only".to_string()));
732        }
733        let store = self.inner.db.store();
734        if store.get_collection(&query.name).is_none() {
735            if query.if_exists {
736                return Ok(RuntimeQueryResult::ok_message(
737                    raw_query.to_string(),
738                    &format!("collection '{}' does not exist", query.name),
739                    "drop",
740                ));
741            }
742            return Err(RedDBError::NotFound(format!(
743                "collection '{}' not found",
744                query.name
745            )));
746        }
747
748        let actual =
749            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750        if let Some(expected) = query.model {
751            polymorphic_resolver::ensure_model_match(expected, actual)?;
752        }
753
754        match actual {
755            CollectionModel::Table => self.execute_drop_table(
756                raw_query,
757                &DropTableQuery {
758                    name: query.name.clone(),
759                    if_exists: query.if_exists,
760                },
761            ),
762            CollectionModel::TimeSeries => self.execute_drop_timeseries(
763                raw_query,
764                &DropTimeSeriesQuery {
765                    name: query.name.clone(),
766                    if_exists: query.if_exists,
767                },
768            ),
769            CollectionModel::Queue => self.execute_drop_queue(
770                raw_query,
771                &DropQueueQuery {
772                    name: query.name.clone(),
773                    if_exists: query.if_exists,
774                },
775            ),
776            CollectionModel::Graph => self.execute_drop_graph(
777                raw_query,
778                &DropGraphQuery {
779                    name: query.name.clone(),
780                    if_exists: query.if_exists,
781                },
782            ),
783            CollectionModel::Vector => self.execute_drop_vector(
784                raw_query,
785                &DropVectorQuery {
786                    name: query.name.clone(),
787                    if_exists: query.if_exists,
788                },
789            ),
790            CollectionModel::Document => self.execute_drop_document(
791                raw_query,
792                &DropDocumentQuery {
793                    name: query.name.clone(),
794                    if_exists: query.if_exists,
795                },
796            ),
797            CollectionModel::Kv => self.execute_drop_kv(
798                raw_query,
799                &DropKvQuery {
800                    name: query.name.clone(),
801                    if_exists: query.if_exists,
802                    model: CollectionModel::Kv,
803                },
804            ),
805            CollectionModel::Config => self.execute_drop_kv(
806                raw_query,
807                &DropKvQuery {
808                    name: query.name.clone(),
809                    if_exists: query.if_exists,
810                    model: CollectionModel::Config,
811                },
812            ),
813            CollectionModel::Vault => self.execute_drop_kv(
814                raw_query,
815                &DropKvQuery {
816                    name: query.name.clone(),
817                    if_exists: query.if_exists,
818                    model: CollectionModel::Vault,
819                },
820            ),
821            CollectionModel::Hll => self.execute_probabilistic_command(
822                raw_query,
823                &ProbabilisticCommand::DropHll {
824                    name: query.name.clone(),
825                    if_exists: query.if_exists,
826                },
827            ),
828            CollectionModel::Sketch => self.execute_probabilistic_command(
829                raw_query,
830                &ProbabilisticCommand::DropSketch {
831                    name: query.name.clone(),
832                    if_exists: query.if_exists,
833                },
834            ),
835            CollectionModel::Filter => self.execute_probabilistic_command(
836                raw_query,
837                &ProbabilisticCommand::DropFilter {
838                    name: query.name.clone(),
839                    if_exists: query.if_exists,
840                },
841            ),
842            CollectionModel::Metrics => self.execute_drop_typed_collection(
843                raw_query,
844                &query.name,
845                query.if_exists,
846                CollectionModel::Metrics,
847                "metrics",
848            ),
849            CollectionModel::Mixed => self.execute_drop_typed_collection(
850                raw_query,
851                &query.name,
852                query.if_exists,
853                CollectionModel::Mixed,
854                "collection",
855            ),
856        }
857    }
858
859    /// Issue #801 — guard `ALTER GRAPH ... ADD|DROP ANALYTICS` so analytics
860    /// lifecycle mutations only land on a collection declared as a graph. The
861    /// `<graph>.<output>` resolver only consults `analytics_config` on
862    /// graph-model contracts, so allowing the op on any other model would write
863    /// dead config the reader can never surface.
864    fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865        let is_graph = self
866            .inner
867            .db
868            .collection_contract(name)
869            .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870            .unwrap_or(false);
871        if !is_graph {
872            return Err(RedDBError::Query(format!(
873                "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874            )));
875        }
876        Ok(())
877    }
878
879    /// Execute ALTER TABLE
880    ///
881    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
882    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
883    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
884    pub fn execute_alter_table(
885        &self,
886        raw_query: &str,
887        query: &AlterTableQuery,
888    ) -> RedDBResult<RuntimeQueryResult> {
889        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890        let store = self.inner.db.store();
891
892        // Verify the table exists.
893        if store.get_collection(&query.name).is_none() {
894            return Err(RedDBError::NotFound(format!(
895                "table '{}' not found",
896                query.name
897            )));
898        }
899
900        let mut messages = Vec::new();
901
902        // Collect column-level changes upfront for schema-change event emission below.
903        let fields_added: Vec<String> = query
904            .operations
905            .iter()
906            .filter_map(|op| {
907                if let AlterOperation::AddColumn(col) = op {
908                    Some(col.name.clone())
909                } else {
910                    None
911                }
912            })
913            .collect();
914        let fields_removed: Vec<String> = query
915            .operations
916            .iter()
917            .filter_map(|op| {
918                if let AlterOperation::DropColumn(name) = op {
919                    Some(name.clone())
920                } else {
921                    None
922                }
923            })
924            .collect();
925
926        for op in &query.operations {
927            match op {
928                AlterOperation::AddColumn(col) => {
929                    // Schema-on-read: column will be available on next insert.
930                    messages.push(format!("column '{}' added", col.name));
931                }
932                AlterOperation::DropColumn(name) => {
933                    messages.push(format!("column '{}' dropped", name));
934                }
935                AlterOperation::RenameColumn { from, to } => {
936                    messages.push(format!("column '{}' renamed to '{}'", from, to));
937                }
938                AlterOperation::AttachPartition { child, bound } => {
939                    // Persist child → parent binding in red_config so the
940                    // future planner-side pruner can enumerate children and
941                    // evaluate their bounds.
942                    store.set_config_tree(
943                        &format!("partition.{}.children.{}", query.name, child),
944                        &crate::serde_json::Value::String(bound.clone()),
945                    );
946                    messages.push(format!(
947                        "partition '{child}' attached to '{}' ({bound})",
948                        query.name
949                    ));
950                }
951                AlterOperation::DetachPartition { child } => {
952                    store.set_config_tree(
953                        &format!("partition.{}.children.{}", query.name, child),
954                        &crate::serde_json::Value::Null,
955                    );
956                    messages.push(format!(
957                        "partition '{child}' detached from '{}'",
958                        query.name
959                    ));
960                }
961                AlterOperation::EnableRowLevelSecurity => {
962                    self.inner
963                        .rls_enabled_tables
964                        .write()
965                        .insert(query.name.clone());
966                    // Persist flag so RLS survives restart via red_config.
967                    store.set_config_tree(
968                        &format!("rls.enabled.{}", query.name),
969                        &crate::serde_json::Value::Bool(true),
970                    );
971                    self.invalidate_plan_cache();
972                    messages.push(format!("row level security enabled on '{}'", query.name));
973                }
974                AlterOperation::DisableRowLevelSecurity => {
975                    self.inner.rls_enabled_tables.write().remove(&query.name);
976                    store.set_config_tree(
977                        &format!("rls.enabled.{}", query.name),
978                        &crate::serde_json::Value::Null,
979                    );
980                    self.invalidate_plan_cache();
981                    messages.push(format!("row level security disabled on '{}'", query.name));
982                }
983                // Phase 2.5.4: retrofit tenancy onto an existing table.
984                AlterOperation::EnableTenancy { column } => {
985                    store.set_config_tree(
986                        &format!("tenant_tables.{}.column", query.name),
987                        &crate::serde_json::Value::String(column.clone()),
988                    );
989                    self.register_tenant_table(&query.name, column);
990                    self.invalidate_plan_cache();
991                    messages.push(format!(
992                        "tenancy enabled on '{}' by column '{column}'",
993                        query.name
994                    ));
995                }
996                AlterOperation::DisableTenancy => {
997                    store.set_config_tree(
998                        &format!("tenant_tables.{}.column", query.name),
999                        &crate::serde_json::Value::Null,
1000                    );
1001                    self.unregister_tenant_table(&query.name);
1002                    self.invalidate_plan_cache();
1003                    messages.push(format!("tenancy disabled on '{}'", query.name));
1004                }
1005                AlterOperation::SetAppendOnly(on) => {
1006                    // Contract is the single source of truth for the
1007                    // UPDATE/DELETE parse-time guard. The flag lands
1008                    // below via `apply_alter_operations_to_contract`;
1009                    // here we only publish the human-readable message.
1010                    messages.push(format!(
1011                        "append_only {} on '{}'",
1012                        if *on { "enabled" } else { "disabled" },
1013                        query.name
1014                    ));
1015                }
1016                AlterOperation::SetVersioned(on) => {
1017                    // Opt the collection into (or out of) Git-for-Data.
1018                    // Persists a row in red_vcs_settings; next AS OF /
1019                    // merge / diff against this table honours the
1020                    // flag. Retroactive: existing row versions whose
1021                    // xmins are still pinned by commits become
1022                    // reachable via AS OF COMMIT immediately.
1023                    self.vcs_set_versioned(&query.name, *on)?;
1024                    messages.push(format!(
1025                        "versioned {} on '{}'",
1026                        if *on { "enabled" } else { "disabled" },
1027                        query.name
1028                    ));
1029                }
1030                AlterOperation::EnableEvents(subscription) => {
1031                    let mut subscription = subscription.clone();
1032                    subscription.source = query.name.clone();
1033                    validate_event_subscriptions(
1034                        self,
1035                        &query.name,
1036                        std::slice::from_ref(&subscription),
1037                    )?;
1038                    ensure_event_target_queue(self, &subscription.target_queue)?;
1039                    messages.push(format!(
1040                        "events enabled on '{}' to '{}'",
1041                        query.name, subscription.target_queue
1042                    ));
1043                }
1044                AlterOperation::DisableEvents => {
1045                    messages.push(format!("events disabled on '{}'", query.name));
1046                }
1047                AlterOperation::AddSubscription { name, descriptor } => {
1048                    let mut sub = descriptor.clone();
1049                    sub.name = name.clone();
1050                    sub.source = query.name.clone();
1051                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052                    ensure_event_target_queue(self, &sub.target_queue)?;
1053                    messages.push(format!(
1054                        "subscription '{}' added on '{}' to '{}'",
1055                        name, query.name, sub.target_queue
1056                    ));
1057                }
1058                AlterOperation::DropSubscription { name } => {
1059                    messages.push(format!(
1060                        "subscription '{}' dropped on '{}'",
1061                        name, query.name
1062                    ));
1063                }
1064                AlterOperation::AddSigner { pubkey } => {
1065                    // Issue #522 — admin-gated registry mutation. The
1066                    // standard DDL `check_write` above gates by role; we
1067                    // additionally verify the collection actually has a
1068                    // signed-writes registry installed so this isn't a
1069                    // covert way to retrofit one (use `CREATE COLLECTION
1070                    // ... SIGNED_BY (...)` for that).
1071                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072                        return Err(RedDBError::Query(format!(
1073                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075                            query.name
1076                        )));
1077                    }
1078                    let actor = crate::runtime::impl_core::current_user_projected()
1079                        .unwrap_or_else(|| "@system/alter".to_string());
1080                    let changed = crate::runtime::signed_writes_kind::add_signer(
1081                        &store,
1082                        &query.name,
1083                        *pubkey,
1084                        &actor,
1085                    );
1086                    messages.push(format!(
1087                        "signer {} on '{}'",
1088                        if changed { "added" } else { "already present" },
1089                        query.name
1090                    ));
1091                }
1092                AlterOperation::RevokeSigner { pubkey } => {
1093                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094                        return Err(RedDBError::Query(format!(
1095                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096                            query.name
1097                        )));
1098                    }
1099                    let actor = crate::runtime::impl_core::current_user_projected()
1100                        .unwrap_or_else(|| "@system/alter".to_string());
1101                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102                        &store,
1103                        &query.name,
1104                        pubkey,
1105                        &actor,
1106                    );
1107                    messages.push(format!(
1108                        "signer {} on '{}'",
1109                        if changed {
1110                            "revoked"
1111                        } else {
1112                            "already revoked"
1113                        },
1114                        query.name
1115                    ));
1116                }
1117                AlterOperation::SetRetention { duration_ms } => {
1118                    // Issue #580 — validate that the collection has a
1119                    // timestamp column the lazy-on-scan filter can key
1120                    // off. Without one there is no anchor for "older
1121                    // than now - duration"; reject at ALTER time so the
1122                    // policy can never silently hide all rows.
1123                    let existing = self.inner.db.collection_contract(&query.name);
1124                    let has_ts_column = existing
1125                        .as_ref()
1126                        .map(retention_timestamp_column_exists)
1127                        .unwrap_or(false);
1128                    if !has_ts_column {
1129                        return Err(RedDBError::Query(format!(
1130                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132                             or enable WITH timestamps = true before setting a \
1133                             retention policy",
1134                            query.name
1135                        )));
1136                    }
1137                    messages.push(format!(
1138                        "retention set to {duration_ms} ms on '{}'",
1139                        query.name
1140                    ));
1141                }
1142                AlterOperation::UnsetRetention => {
1143                    messages.push(format!("retention cleared on '{}'", query.name));
1144                }
1145                AlterOperation::AddAnalytics(views) => {
1146                    // Issue #801 — analytics lifecycle is graph-only. Reject the
1147                    // op on a non-graph collection so a misrouted ALTER fails
1148                    // loudly instead of silently writing analytics_config onto a
1149                    // model whose reader never resolves `<name>.<output>`.
1150                    self.require_graph_for_analytics(&query.name)?;
1151                    let existing = self.inner.db.collection_contract(&query.name);
1152                    let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153                        existing
1154                            .as_ref()
1155                            .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156                            .unwrap_or_default();
1157                    for view in views {
1158                        if enabled.contains(&view.output) {
1159                            // Idempotent: adding an already-enabled output is a
1160                            // no-op — no error, no duplicate state.
1161                            messages.push(format!(
1162                                "analytics '{}' already enabled on '{}'",
1163                                view.output.as_str(),
1164                                query.name
1165                            ));
1166                        } else {
1167                            messages.push(format!(
1168                                "analytics '{}' enabled on '{}'",
1169                                view.output.as_str(),
1170                                query.name
1171                            ));
1172                        }
1173                    }
1174                }
1175                AlterOperation::DropAnalytics(output) => {
1176                    self.require_graph_for_analytics(&query.name)?;
1177                    let enabled = self
1178                        .inner
1179                        .db
1180                        .collection_contract(&query.name)
1181                        .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182                        .unwrap_or(false);
1183                    if !enabled {
1184                        // Dropping an output that was never enabled is a clean,
1185                        // explicit error (AC4) rather than a silent no-op.
1186                        return Err(RedDBError::Query(format!(
1187                            "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188                            output.as_str(),
1189                            query.name
1190                        )));
1191                    }
1192                    messages.push(format!(
1193                        "analytics '{}' disabled on '{}'",
1194                        output.as_str(),
1195                        query.name
1196                    ));
1197                }
1198            }
1199        }
1200
1201        let mut contract = self
1202            .inner
1203            .db
1204            .collection_contract(&query.name)
1205            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206        apply_alter_operations_to_contract(&mut contract, &query.operations);
1207        contract.version = contract.version.saturating_add(1);
1208        contract.updated_at_unix_ms = current_unix_ms();
1209        self.inner
1210            .db
1211            .save_collection_contract(contract)
1212            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213        // Issue #301 — emit OperatorEvent when column schema changes on a
1214        // collection that has active event subscriptions, so operators know
1215        // downstream consumers may see a different payload shape.
1216        if !fields_added.is_empty() || !fields_removed.is_empty() {
1217            let sub_names: Vec<String> = self
1218                .inner
1219                .db
1220                .collection_contract(&query.name)
1221                .map(|c| {
1222                    c.subscriptions
1223                        .iter()
1224                        .filter(|s| s.enabled)
1225                        .map(|s| s.name.clone())
1226                        .collect()
1227                })
1228                .unwrap_or_default();
1229            if !sub_names.is_empty() {
1230                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231                    collection: query.name.clone(),
1232                    subscription_names: sub_names.join(", "),
1233                    fields_added: fields_added.join(", "),
1234                    fields_removed: fields_removed.join(", "),
1235                    lsn: self.cdc_current_lsn(),
1236                }
1237                .emit_global();
1238            }
1239        }
1240
1241        self.clear_table_planner_stats(&query.name);
1242        self.invalidate_result_cache();
1243        // Issue #120 — refresh the schema-vocabulary entries from the
1244        // post-ALTER contract. Drop+recreate inside the index keeps
1245        // the invalidation guarantee complete (no stale columns from
1246        // before an ALTER ... DROP COLUMN).
1247        let post_alter_columns: Vec<String> = self
1248            .inner
1249            .db
1250            .collection_contract(&query.name)
1251            .map(|contract| {
1252                contract
1253                    .declared_columns
1254                    .iter()
1255                    .map(|col| col.name.clone())
1256                    .collect()
1257            })
1258            .unwrap_or_default();
1259        self.schema_vocabulary_apply(
1260            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261                collection: query.name.clone(),
1262                columns: post_alter_columns,
1263                type_tags: Vec::new(),
1264                description: None,
1265            },
1266        );
1267
1268        let message = if messages.is_empty() {
1269            format!("table '{}' altered (no operations)", query.name)
1270        } else {
1271            format!("table '{}' altered: {}", query.name, messages.join(", "))
1272        };
1273
1274        Ok(RuntimeQueryResult::ok_message(
1275            raw_query.to_string(),
1276            &message,
1277            "alter",
1278        ))
1279    }
1280
1281    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1282    ///
1283    /// Pure read: computes the schema diff between the target table's
1284    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1285    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1286    /// JSON. Never mutates storage.
1287    pub fn execute_explain_alter(
1288        &self,
1289        raw_query: &str,
1290        query: &ExplainAlterQuery,
1291    ) -> RedDBResult<RuntimeQueryResult> {
1292        // Validate the target CREATE TABLE body so syntactically valid
1293        // but semantically broken targets (bad SQL types, duplicate
1294        // columns) are caught here rather than inside the diff engine.
1295        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297        let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300            .as_ref()
1301            .map(|c| c.declared_columns.clone())
1302            .unwrap_or_default();
1303
1304        let diff = super::schema_diff::compute_column_diff(
1305            &query.target.name,
1306            &current_columns,
1307            &query.target.columns,
1308        );
1309
1310        let rendered = match query.format {
1311            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313        };
1314
1315        let format_label = match query.format {
1316            ExplainFormat::Sql => "sql",
1317            ExplainFormat::Json => "json",
1318        };
1319
1320        let columns = vec![
1321            "table".to_string(),
1322            "format".to_string(),
1323            "diff".to_string(),
1324        ];
1325        let row = vec![
1326            ("table".to_string(), Value::text(query.target.name.clone())),
1327            ("format".to_string(), Value::text(format_label.to_string())),
1328            ("diff".to_string(), Value::text(rendered)),
1329        ];
1330
1331        Ok(RuntimeQueryResult::ok_records(
1332            raw_query.to_string(),
1333            columns,
1334            vec![row],
1335            "explain",
1336        ))
1337    }
1338
1339    /// Execute CREATE INDEX
1340    ///
1341    /// Registers a new index on a collection, builds it from existing data,
1342    /// and makes it available to the query executor for O(1) lookups.
1343    pub fn execute_create_index(
1344        &self,
1345        raw_query: &str,
1346        query: &CreateIndexQuery,
1347    ) -> RedDBResult<RuntimeQueryResult> {
1348        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349        let store = self.inner.db.store();
1350
1351        // Verify the table exists
1352        let manager = store
1353            .get_collection(&query.table)
1354            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356        let method_kind = match query.method {
1357            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361        };
1362
1363        // Extract fields from existing entities for indexing. Row
1364        // entities may arrive in either the "named" HashMap layout
1365        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1366        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1367        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1368        // leaves `named == None`). Prior to this commit the columnar
1369        // branch returned an empty field list, so `CREATE INDEX` built
1370        // a zero-entity index over HTTP-inserted data even though the
1371        // data was queryable via `SELECT`.
1372        let entities = manager.query_all(|_| true);
1373        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374            entities
1375                .iter()
1376                .map(|e| {
1377                    let fields = match &e.data {
1378                        crate::storage::EntityData::Row(row) => {
1379                            if let Some(ref named) = row.named {
1380                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381                            } else if let Some(ref schema) = row.schema {
1382                                // Columnar layout — pair each column
1383                                // with its positional name from the
1384                                // shared schema Arc.
1385                                schema
1386                                    .iter()
1387                                    .zip(row.columns.iter())
1388                                    .map(|(k, v)| (k.clone(), v.clone()))
1389                                    .collect()
1390                            } else {
1391                                Vec::new()
1392                            }
1393                        }
1394                        crate::storage::EntityData::Node(node) => node
1395                            .properties
1396                            .iter()
1397                            .map(|(k, v)| (k.clone(), v.clone()))
1398                            .collect(),
1399                        _ => Vec::new(),
1400                    };
1401                    (e.id, fields)
1402                })
1403                .collect();
1404
1405        // Build the index
1406        let indexed_count = self
1407            .inner
1408            .index_store
1409            .create_index(
1410                &query.name,
1411                &query.table,
1412                &query.columns,
1413                method_kind,
1414                query.unique,
1415                &entity_fields,
1416            )
1417            .map_err(RedDBError::Internal)?;
1418
1419        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420            &query.table,
1421            &entity_fields,
1422        );
1423        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424        self.invalidate_plan_cache();
1425
1426        // Register metadata
1427        self.inner
1428            .index_store
1429            .register(super::index_store::RegisteredIndex {
1430                name: query.name.clone(),
1431                collection: query.table.clone(),
1432                columns: query.columns.clone(),
1433                method: method_kind,
1434                unique: query.unique,
1435            });
1436        self.persist_runtime_index_descriptor(super::index_store::RegisteredIndex {
1437            name: query.name.clone(),
1438            collection: query.table.clone(),
1439            columns: query.columns.clone(),
1440            method: method_kind,
1441            unique: query.unique,
1442        })?;
1443        // Issue #120 — surface the index name + indexed columns in
1444        // the schema-vocabulary so AskPipeline (#121) can resolve
1445        // "the email index" back to its collection.
1446        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1447            collection: query.table.clone(),
1448            index: query.name.clone(),
1449            columns: query.columns.clone(),
1450        });
1451
1452        let method_str = format!("{}", query.method);
1453        let unique_str = if query.unique { "unique " } else { "" };
1454        let cols = query.columns.join(", ");
1455
1456        Ok(RuntimeQueryResult::ok_message(
1457            raw_query.to_string(),
1458            &format!(
1459                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1460                unique_str, query.name, query.table, cols, method_str, indexed_count
1461            ),
1462            "create",
1463        ))
1464    }
1465
1466    /// Execute DROP INDEX
1467    ///
1468    /// Removes an index from a collection.
1469    pub fn execute_drop_index(
1470        &self,
1471        raw_query: &str,
1472        query: &DropIndexQuery,
1473    ) -> RedDBResult<RuntimeQueryResult> {
1474        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1475        let store = self.inner.db.store();
1476
1477        // Verify the table exists
1478        if store.get_collection(&query.table).is_none() {
1479            if query.if_exists {
1480                return Ok(RuntimeQueryResult::ok_message(
1481                    raw_query.to_string(),
1482                    &format!("table '{}' does not exist", query.table),
1483                    "drop",
1484                ));
1485            }
1486            return Err(RedDBError::NotFound(format!(
1487                "table '{}' not found",
1488                query.table
1489            )));
1490        }
1491
1492        // Remove from IndexStore
1493        self.inner.index_store.drop_index(&query.name, &query.table);
1494        self.persist_runtime_index_drop(&query.table, &query.name)?;
1495        self.invalidate_plan_cache();
1496        // Issue #120 — keep the schema-vocabulary index entry in sync.
1497        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1498            collection: query.table.clone(),
1499            index: query.name.clone(),
1500        });
1501
1502        Ok(RuntimeQueryResult::ok_message(
1503            raw_query.to_string(),
1504            &format!("index '{}' dropped from '{}'", query.name, query.table),
1505            "drop",
1506        ))
1507    }
1508
1509    fn execute_drop_typed_collection(
1510        &self,
1511        raw_query: &str,
1512        name: &str,
1513        if_exists: bool,
1514        expected_model: CollectionModel,
1515        label: &str,
1516    ) -> RedDBResult<RuntimeQueryResult> {
1517        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1518        if is_system_schema_name(name) {
1519            return Err(RedDBError::Query("system schema is read-only".to_string()));
1520        }
1521        let store = self.inner.db.store();
1522        if store.get_collection(name).is_none() {
1523            if if_exists {
1524                return Ok(RuntimeQueryResult::ok_message(
1525                    raw_query.to_string(),
1526                    &format!("{label} '{name}' does not exist"),
1527                    "drop",
1528                ));
1529            }
1530            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1531        }
1532
1533        let actual = self
1534            .inner
1535            .db
1536            .collection_contract(name)
1537            .map(|contract| contract.declared_model)
1538            .map(Ok)
1539            .unwrap_or_else(|| {
1540                polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())
1541            })?;
1542        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1543        self.drop_collection_storage(raw_query, name, label)
1544    }
1545
1546    pub fn execute_truncate(
1547        &self,
1548        raw_query: &str,
1549        query: &TruncateQuery,
1550    ) -> RedDBResult<RuntimeQueryResult> {
1551        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1552        if is_system_schema_name(&query.name) {
1553            return Err(RedDBError::Query("system schema is read-only".to_string()));
1554        }
1555
1556        let label = query
1557            .model
1558            .map(polymorphic_resolver::model_name)
1559            .unwrap_or("collection");
1560        let store = self.inner.db.store();
1561        if store.get_collection(&query.name).is_none() {
1562            if query.if_exists {
1563                return Ok(RuntimeQueryResult::ok_message(
1564                    raw_query.to_string(),
1565                    &format!("{label} '{}' does not exist", query.name),
1566                    "truncate",
1567                ));
1568            }
1569            return Err(RedDBError::NotFound(format!(
1570                "{label} '{}' not found",
1571                query.name
1572            )));
1573        }
1574
1575        let actual =
1576            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1577        if let Some(expected) = query.model {
1578            polymorphic_resolver::ensure_model_match(expected, actual)?;
1579        }
1580
1581        if actual == CollectionModel::Queue {
1582            return self.execute_queue_command(
1583                raw_query,
1584                &QueueCommand::Purge {
1585                    queue: query.name.clone(),
1586                },
1587            );
1588        }
1589
1590        // Count before wiping so we can emit the aggregated truncate event.
1591        let affected = self.truncate_collection_entities(&query.name)?;
1592        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1593        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1594        self.inner.db.invalidate_vector_index(&query.name);
1595        self.clear_table_planner_stats(&query.name);
1596        self.invalidate_result_cache();
1597
1598        Ok(RuntimeQueryResult::ok_message(
1599            raw_query.to_string(),
1600            &format!(
1601                "{affected} entities truncated from {label} '{}'",
1602                query.name
1603            ),
1604            "truncate",
1605        ))
1606    }
1607
1608    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1609        let store = self.inner.db.store();
1610        let Some(manager) = store.get_collection(name) else {
1611            return Ok(0);
1612        };
1613        let entities = manager.query_all(|_| true);
1614        if entities.is_empty() {
1615            return Ok(0);
1616        }
1617
1618        for entity in &entities {
1619            let fields = entity_index_fields(&entity.data);
1620            self.inner
1621                .index_store
1622                .index_entity_delete(name, entity.id, &fields)
1623                .map_err(RedDBError::Internal)?;
1624        }
1625
1626        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1627        let deleted_ids = store
1628            .delete_batch(name, &ids)
1629            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1630        for id in &deleted_ids {
1631            store.context_index().remove_entity(*id);
1632        }
1633        Ok(deleted_ids.len() as u64)
1634    }
1635
1636    fn drop_collection_storage(
1637        &self,
1638        raw_query: &str,
1639        name: &str,
1640        label: &str,
1641    ) -> RedDBResult<RuntimeQueryResult> {
1642        let store = self.inner.db.store();
1643
1644        // Emit 1 collection_dropped event before storage is wiped.
1645        // Queue is preserved; subscription is removed with the contract below.
1646        let final_count = store
1647            .get_collection(name)
1648            .map(|manager| manager.query_all(|_| true).len() as u64)
1649            .unwrap_or(0);
1650        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1651            self,
1652            name,
1653            final_count,
1654        )?;
1655
1656        let orphaned_indices: Vec<String> = self
1657            .inner
1658            .index_store
1659            .list_indices(name)
1660            .into_iter()
1661            .map(|index| index.name)
1662            .collect();
1663        for index_name in &orphaned_indices {
1664            self.inner.index_store.drop_index(index_name, name);
1665        }
1666
1667        store
1668            .drop_collection(name)
1669            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1670        self.inner.db.invalidate_vector_index(name);
1671        self.inner.db.clear_collection_default_ttl_ms(name);
1672        self.inner
1673            .db
1674            .remove_collection_contract(name)
1675            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1676        self.clear_table_planner_stats(name);
1677        self.invalidate_result_cache();
1678        if let Some(store) = self.inner.auth_store.read().clone() {
1679            store.invalidate_visible_collections_cache();
1680        }
1681        self.inner
1682            .db
1683            .persist_metadata()
1684            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1685        self.schema_vocabulary_apply(
1686            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1687                collection: name.to_string(),
1688            },
1689        );
1690
1691        Ok(RuntimeQueryResult::ok_message(
1692            raw_query.to_string(),
1693            &format!("{label} '{name}' dropped"),
1694            "drop",
1695        ))
1696    }
1697}
1698
1699pub(crate) fn is_system_schema_name(name: &str) -> bool {
1700    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1701}
1702
1703fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1704    match data {
1705        EntityData::Row(row) => {
1706            if let Some(ref named) = row.named {
1707                named
1708                    .iter()
1709                    .map(|(key, value)| (key.clone(), value.clone()))
1710                    .collect()
1711            } else if let Some(ref schema) = row.schema {
1712                schema
1713                    .iter()
1714                    .zip(row.columns.iter())
1715                    .map(|(key, value)| (key.clone(), value.clone()))
1716                    .collect()
1717            } else {
1718                Vec::new()
1719            }
1720        }
1721        EntityData::Node(node) => node
1722            .properties
1723            .iter()
1724            .map(|(key, value)| (key.clone(), value.clone()))
1725            .collect(),
1726        _ => Vec::new(),
1727    }
1728}
1729
1730fn collection_contract_from_create_table(
1731    query: &CreateTableQuery,
1732) -> RedDBResult<crate::physical::CollectionContract> {
1733    let now = current_unix_ms();
1734    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1735        .columns
1736        .iter()
1737        .map(declared_column_contract_from_ddl)
1738        .collect();
1739    if query.timestamps {
1740        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1741        // columns that the write path populates from
1742        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1743        declared_columns.push(crate::physical::DeclaredColumnContract {
1744            name: "created_at".to_string(),
1745            data_type: "BIGINT".to_string(),
1746            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1747            not_null: true,
1748            default: None,
1749            compress: None,
1750            unique: false,
1751            primary_key: false,
1752            enum_variants: Vec::new(),
1753            array_element: None,
1754            decimal_precision: None,
1755        });
1756        declared_columns.push(crate::physical::DeclaredColumnContract {
1757            name: "updated_at".to_string(),
1758            data_type: "BIGINT".to_string(),
1759            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1760            not_null: true,
1761            default: None,
1762            compress: None,
1763            unique: false,
1764            primary_key: false,
1765            enum_variants: Vec::new(),
1766            array_element: None,
1767            decimal_precision: None,
1768        });
1769    }
1770    Ok(crate::physical::CollectionContract {
1771        name: query.name.clone(),
1772        declared_model: crate::catalog::CollectionModel::Table,
1773        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1774        origin: crate::physical::ContractOrigin::Explicit,
1775        version: 1,
1776        created_at_unix_ms: now,
1777        updated_at_unix_ms: now,
1778        default_ttl_ms: query.default_ttl_ms,
1779        vector_dimension: None,
1780        vector_metric: None,
1781        context_index_fields: query.context_index_fields.clone(),
1782        declared_columns,
1783        table_def: Some(build_table_def_from_create_table(query)?),
1784        timestamps_enabled: query.timestamps,
1785        context_index_enabled: query.context_index_enabled
1786            || !query.context_index_fields.is_empty(),
1787        metrics_raw_retention_ms: None,
1788        metrics_rollup_policies: Vec::new(),
1789        metrics_tenant_identity: None,
1790        metrics_namespace: None,
1791        append_only: query.append_only,
1792        subscriptions: query.subscriptions.clone(),
1793        analytics_config: Vec::new(),
1794        session_key: None,
1795        session_gap_ms: None,
1796        retention_duration_ms: None,
1797        analytical_storage: None,
1798    })
1799}
1800
1801fn default_collection_contract_for_existing_table(
1802    name: &str,
1803) -> crate::physical::CollectionContract {
1804    let now = current_unix_ms();
1805    crate::physical::CollectionContract {
1806        name: name.to_string(),
1807        declared_model: crate::catalog::CollectionModel::Table,
1808        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1809        origin: crate::physical::ContractOrigin::Explicit,
1810        version: 0,
1811        created_at_unix_ms: now,
1812        updated_at_unix_ms: now,
1813        default_ttl_ms: None,
1814        vector_dimension: None,
1815        vector_metric: None,
1816        context_index_fields: Vec::new(),
1817        declared_columns: Vec::new(),
1818        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1819        timestamps_enabled: false,
1820        context_index_enabled: false,
1821        metrics_raw_retention_ms: None,
1822        metrics_rollup_policies: Vec::new(),
1823        metrics_tenant_identity: None,
1824        metrics_namespace: None,
1825        append_only: false,
1826        subscriptions: Vec::new(),
1827        analytics_config: Vec::new(),
1828        session_key: None,
1829        session_gap_ms: None,
1830        retention_duration_ms: None,
1831        analytical_storage: None,
1832    }
1833}
1834
1835fn keyed_collection_contract(
1836    name: &str,
1837    model: crate::catalog::CollectionModel,
1838    analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1839) -> crate::physical::CollectionContract {
1840    let now = current_unix_ms();
1841    crate::physical::CollectionContract {
1842        name: name.to_string(),
1843        declared_model: model,
1844        schema_mode: crate::catalog::SchemaMode::Dynamic,
1845        origin: crate::physical::ContractOrigin::Explicit,
1846        version: 1,
1847        created_at_unix_ms: now,
1848        updated_at_unix_ms: now,
1849        default_ttl_ms: None,
1850        vector_dimension: None,
1851        vector_metric: None,
1852        context_index_fields: Vec::new(),
1853        declared_columns: Vec::new(),
1854        table_def: None,
1855        timestamps_enabled: false,
1856        context_index_enabled: false,
1857        metrics_raw_retention_ms: None,
1858        metrics_rollup_policies: Vec::new(),
1859        metrics_tenant_identity: None,
1860        metrics_namespace: None,
1861        append_only: false,
1862        subscriptions: Vec::new(),
1863        analytics_config,
1864        session_key: None,
1865        session_gap_ms: None,
1866        retention_duration_ms: None,
1867        analytical_storage: None,
1868    }
1869}
1870
1871fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1872    let now = current_unix_ms();
1873    crate::physical::CollectionContract {
1874        name: query.name.clone(),
1875        declared_model: crate::catalog::CollectionModel::Metrics,
1876        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1877        origin: crate::physical::ContractOrigin::Explicit,
1878        version: 1,
1879        created_at_unix_ms: now,
1880        updated_at_unix_ms: now,
1881        default_ttl_ms: query.default_ttl_ms,
1882        vector_dimension: None,
1883        vector_metric: None,
1884        context_index_fields: Vec::new(),
1885        declared_columns: Vec::new(),
1886        table_def: None,
1887        timestamps_enabled: false,
1888        context_index_enabled: false,
1889        metrics_raw_retention_ms: query.default_ttl_ms,
1890        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1891        metrics_tenant_identity: Some(
1892            query
1893                .tenant_by
1894                .clone()
1895                .unwrap_or_else(|| "current_tenant".to_string()),
1896        ),
1897        metrics_namespace: Some("default".to_string()),
1898        append_only: true,
1899        subscriptions: Vec::new(),
1900        analytics_config: Vec::new(),
1901        session_key: None,
1902        session_gap_ms: None,
1903        retention_duration_ms: None,
1904        analytical_storage: None,
1905    }
1906}
1907
1908fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1909    let now = current_unix_ms();
1910    crate::physical::CollectionContract {
1911        name: query.name.clone(),
1912        declared_model: crate::catalog::CollectionModel::Vector,
1913        schema_mode: crate::catalog::SchemaMode::Dynamic,
1914        origin: crate::physical::ContractOrigin::Explicit,
1915        version: 1,
1916        created_at_unix_ms: now,
1917        updated_at_unix_ms: now,
1918        default_ttl_ms: None,
1919        vector_dimension: Some(query.dimension),
1920        vector_metric: Some(query.metric),
1921        context_index_fields: Vec::new(),
1922        declared_columns: Vec::new(),
1923        table_def: None,
1924        timestamps_enabled: false,
1925        context_index_enabled: false,
1926        metrics_raw_retention_ms: None,
1927        metrics_rollup_policies: Vec::new(),
1928        metrics_tenant_identity: None,
1929        metrics_namespace: None,
1930        append_only: false,
1931        subscriptions: Vec::new(),
1932        analytics_config: Vec::new(),
1933        session_key: None,
1934        session_gap_ms: None,
1935        retention_duration_ms: None,
1936        analytical_storage: None,
1937    }
1938}
1939
1940fn declared_column_contract_from_ddl(
1941    column: &CreateColumnDef,
1942) -> crate::physical::DeclaredColumnContract {
1943    crate::physical::DeclaredColumnContract {
1944        name: column.name.clone(),
1945        data_type: column.data_type.clone(),
1946        sql_type: Some(column.sql_type.clone()),
1947        not_null: column.not_null,
1948        default: column.default.clone(),
1949        compress: column.compress,
1950        unique: column.unique,
1951        primary_key: column.primary_key,
1952        enum_variants: column.enum_variants.clone(),
1953        array_element: column.array_element.clone(),
1954        decimal_precision: column.decimal_precision,
1955    }
1956}
1957
1958fn apply_alter_operations_to_contract(
1959    contract: &mut crate::physical::CollectionContract,
1960    operations: &[AlterOperation],
1961) {
1962    if contract.table_def.is_none() {
1963        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1964    }
1965    for operation in operations {
1966        match operation {
1967            AlterOperation::AddColumn(column) => {
1968                if !contract
1969                    .declared_columns
1970                    .iter()
1971                    .any(|existing| existing.name == column.name)
1972                {
1973                    contract
1974                        .declared_columns
1975                        .push(declared_column_contract_from_ddl(column));
1976                }
1977                if let Some(table_def) = contract.table_def.as_mut() {
1978                    if table_def.get_column(&column.name).is_none() {
1979                        if let Ok(column_def) = column_def_from_ddl(column) {
1980                            if column.primary_key {
1981                                table_def.primary_key.push(column.name.clone());
1982                                table_def.constraints.push(
1983                                    crate::storage::schema::Constraint::new(
1984                                        format!("pk_{}", column.name),
1985                                        crate::storage::schema::ConstraintType::PrimaryKey,
1986                                    )
1987                                    .on_columns(vec![column.name.clone()]),
1988                                );
1989                            }
1990                            if column.unique {
1991                                table_def.constraints.push(
1992                                    crate::storage::schema::Constraint::new(
1993                                        format!("uniq_{}", column.name),
1994                                        crate::storage::schema::ConstraintType::Unique,
1995                                    )
1996                                    .on_columns(vec![column.name.clone()]),
1997                                );
1998                            }
1999                            if column.not_null {
2000                                table_def.constraints.push(
2001                                    crate::storage::schema::Constraint::new(
2002                                        format!("not_null_{}", column.name),
2003                                        crate::storage::schema::ConstraintType::NotNull,
2004                                    )
2005                                    .on_columns(vec![column.name.clone()]),
2006                                );
2007                            }
2008                            table_def.columns.push(column_def);
2009                        }
2010                    }
2011                }
2012            }
2013            AlterOperation::DropColumn(name) => {
2014                contract
2015                    .declared_columns
2016                    .retain(|column| column.name != *name);
2017                if let Some(table_def) = contract.table_def.as_mut() {
2018                    if let Some(index) = table_def.column_index(name) {
2019                        table_def.columns.remove(index);
2020                    }
2021                    table_def.primary_key.retain(|column| column != name);
2022                    table_def.constraints.retain(|constraint| {
2023                        !constraint.columns.iter().any(|column| column == name)
2024                    });
2025                    table_def
2026                        .indexes
2027                        .retain(|index| !index.columns.iter().any(|column| column == name));
2028                }
2029            }
2030            AlterOperation::RenameColumn { from, to } => {
2031                if contract
2032                    .declared_columns
2033                    .iter()
2034                    .any(|column| column.name == *to)
2035                {
2036                    continue;
2037                }
2038                if let Some(column) = contract
2039                    .declared_columns
2040                    .iter_mut()
2041                    .find(|column| column.name == *from)
2042                {
2043                    column.name = to.clone();
2044                }
2045                if let Some(table_def) = contract.table_def.as_mut() {
2046                    if let Some(column) = table_def
2047                        .columns
2048                        .iter_mut()
2049                        .find(|column| column.name == *from)
2050                    {
2051                        column.name = to.clone();
2052                    }
2053                    for primary_key in &mut table_def.primary_key {
2054                        if *primary_key == *from {
2055                            *primary_key = to.clone();
2056                        }
2057                    }
2058                    for constraint in &mut table_def.constraints {
2059                        for column in &mut constraint.columns {
2060                            if *column == *from {
2061                                *column = to.clone();
2062                            }
2063                        }
2064                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2065                            for column in ref_columns {
2066                                if *column == *from {
2067                                    *column = to.clone();
2068                                }
2069                            }
2070                        }
2071                    }
2072                    for index in &mut table_def.indexes {
2073                        for column in &mut index.columns {
2074                            if *column == *from {
2075                                *column = to.clone();
2076                            }
2077                        }
2078                    }
2079                }
2080            }
2081            // Partition ops don't touch the column contract — metadata is
2082            // persisted separately via `red_config.partition.*`.
2083            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2084            // RLS toggles don't touch the column contract — flag is persisted
2085            // separately via `red_config.rls.enabled.{table}` and enforced
2086            // through the in-memory `rls_enabled_tables` set.
2087            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2088            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
2089            // and are enforced through `tenant_tables` + RLS auto-policy.
2090            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2091            AlterOperation::SetAppendOnly(on) => {
2092                contract.append_only = *on;
2093            }
2094            // VCS opt-in is persisted to red_vcs_settings by the
2095            // executor, not the contract — nothing to do here.
2096            AlterOperation::SetVersioned(_) => {}
2097            AlterOperation::EnableEvents(subscription) => {
2098                let mut subscription = subscription.clone();
2099                subscription.source = contract.name.clone();
2100                subscription.enabled = true;
2101                if let Some(existing) = contract
2102                    .subscriptions
2103                    .iter_mut()
2104                    .find(|existing| existing.target_queue == subscription.target_queue)
2105                {
2106                    *existing = subscription;
2107                } else {
2108                    contract.subscriptions.push(subscription);
2109                }
2110            }
2111            AlterOperation::DisableEvents => {
2112                for subscription in &mut contract.subscriptions {
2113                    subscription.enabled = false;
2114                }
2115            }
2116            AlterOperation::AddSubscription { name, descriptor } => {
2117                let mut sub = descriptor.clone();
2118                sub.name = name.clone();
2119                sub.source = contract.name.clone();
2120                sub.enabled = true;
2121                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2122                {
2123                    *existing = sub;
2124                } else {
2125                    contract.subscriptions.push(sub);
2126                }
2127            }
2128            AlterOperation::DropSubscription { name } => {
2129                contract.subscriptions.retain(|s| s.name != *name);
2130            }
2131            // Signer registry mutations live in `red_config` outside the
2132            // contract surface — the executor applied them directly via
2133            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
2134            // into the column-shaped contract.
2135            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2136            AlterOperation::SetRetention { duration_ms } => {
2137                contract.retention_duration_ms = Some(*duration_ms);
2138            }
2139            AlterOperation::UnsetRetention => {
2140                contract.retention_duration_ms = None;
2141            }
2142            // Issue #801 — fold analytics lifecycle into the WAL-backed
2143            // `analytics_config` so the change is durable and the
2144            // `<graph>.<output>` resolver picks it up on the next read.
2145            AlterOperation::AddAnalytics(views) => {
2146                for view in views {
2147                    // Idempotent: skip outputs already enabled so a repeated
2148                    // ADD never duplicates state. The first declaration wins —
2149                    // re-declaring options requires DROP then ADD.
2150                    if !contract
2151                        .analytics_config
2152                        .iter()
2153                        .any(|existing| existing.output == view.output)
2154                    {
2155                        contract.analytics_config.push(view.clone());
2156                    }
2157                }
2158            }
2159            AlterOperation::DropAnalytics(output) => {
2160                contract
2161                    .analytics_config
2162                    .retain(|view| view.output != *output);
2163            }
2164        }
2165    }
2166}
2167
2168/// Issue #580 — returns true if the contract carries at least one
2169/// column the retention filter can use as a timestamp anchor:
2170/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
2171/// or a user-declared column with a temporal data_type.
2172pub(crate) fn retention_timestamp_column_exists(
2173    contract: &crate::physical::CollectionContract,
2174) -> bool {
2175    if contract.timestamps_enabled {
2176        return true;
2177    }
2178    if matches!(
2179        contract.declared_model,
2180        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2181    ) {
2182        // Time-series and metrics collections carry an intrinsic
2183        // timestamp axis on every row even without a declared column;
2184        // retention has a natural anchor.
2185        return true;
2186    }
2187    contract
2188        .declared_columns
2189        .iter()
2190        .any(|column| is_temporal_data_type(&column.data_type))
2191}
2192
2193fn is_temporal_data_type(data_type: &str) -> bool {
2194    let upper = data_type.to_ascii_uppercase();
2195    matches!(
2196        upper.as_str(),
2197        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2198    )
2199}
2200
2201fn validate_event_subscriptions(
2202    runtime: &RedDBRuntime,
2203    source: &str,
2204    subscriptions: &[crate::catalog::SubscriptionDescriptor],
2205) -> RedDBResult<()> {
2206    for subscription in subscriptions
2207        .iter()
2208        .filter(|subscription| subscription.enabled)
2209    {
2210        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2211            return Err(RedDBError::Query(
2212                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2213            ));
2214        }
2215        validate_subscription_auth(runtime, source, subscription)?;
2216        if subscription.target_queue == source
2217            || subscription_would_create_cycle(
2218                &runtime.inner.db,
2219                source,
2220                &subscription.target_queue,
2221            )
2222        {
2223            return Err(RedDBError::Query(
2224                "subscription would create cycle".to_string(),
2225            ));
2226        }
2227        audit_subscription_redact_gap(runtime, source, subscription);
2228    }
2229    Ok(())
2230}
2231
2232fn validate_subscription_auth(
2233    runtime: &RedDBRuntime,
2234    source: &str,
2235    subscription: &crate::catalog::SubscriptionDescriptor,
2236) -> RedDBResult<()> {
2237    let auth_store = match runtime.inner.auth_store.read().clone() {
2238        Some(store) => store,
2239        None => return Ok(()),
2240    };
2241    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2242        Some(identity) => identity,
2243        None => return Ok(()),
2244    };
2245    let tenant = crate::runtime::impl_core::current_tenant();
2246    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2247
2248    if auth_store.iam_authorization_enabled() {
2249        let ctx = crate::auth::policies::EvalContext {
2250            principal_tenant: tenant.clone(),
2251            current_tenant: tenant.clone(),
2252            peer_ip: None,
2253            mfa_present: false,
2254            now_ms: crate::auth::now_ms(),
2255            principal_is_admin_role: role == crate::auth::Role::Admin,
2256            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2257            principal_is_platform_scoped: principal.tenant.is_none(),
2258        };
2259        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2260        if let Some(t) = tenant.as_deref() {
2261            source_resource = source_resource.with_tenant(t.to_string());
2262        }
2263        if !auth_store.check_policy_authz_with_role(
2264            &principal,
2265            "select",
2266            &source_resource,
2267            &ctx,
2268            role,
2269        ) {
2270            return Err(RedDBError::Query(format!(
2271                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2272                principal, source_resource.kind, source_resource.name
2273            )));
2274        }
2275
2276        let mut target_resource =
2277            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2278        if let Some(t) = tenant.as_deref() {
2279            target_resource = target_resource.with_tenant(t.to_string());
2280        }
2281        if !auth_store.check_policy_authz_with_role(
2282            &principal,
2283            "write",
2284            &target_resource,
2285            &ctx,
2286            role,
2287        ) {
2288            return Err(RedDBError::Query(format!(
2289                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2290                principal, target_resource.kind, target_resource.name
2291            )));
2292        }
2293        return Ok(());
2294    }
2295
2296    let ctx = crate::auth::privileges::AuthzContext {
2297        principal: &username,
2298        effective_role: role,
2299        tenant: tenant.as_deref(),
2300    };
2301    auth_store
2302        .check_grant(
2303            &ctx,
2304            crate::auth::privileges::Action::Select,
2305            &crate::auth::privileges::Resource::table_from_name(source),
2306        )
2307        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2308    auth_store
2309        .check_grant(
2310            &ctx,
2311            crate::auth::privileges::Action::Insert,
2312            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2313        )
2314        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2315    Ok(())
2316}
2317
2318fn audit_subscription_redact_gap(
2319    runtime: &RedDBRuntime,
2320    source: &str,
2321    subscription: &crate::catalog::SubscriptionDescriptor,
2322) {
2323    let auth_store = match runtime.inner.auth_store.read().clone() {
2324        Some(store) if store.iam_authorization_enabled() => store,
2325        _ => return,
2326    };
2327    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2328        Some(identity) => identity,
2329        None => return,
2330    };
2331    let tenant = crate::runtime::impl_core::current_tenant();
2332    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2333    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2334    if missing.is_empty() {
2335        return;
2336    }
2337
2338    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2339    tracing::warn!(
2340        target: "reddb::operator",
2341        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2342        source,
2343        subscription.target_queue,
2344        columns
2345    );
2346    let mut event = AuditEvent::builder("subscription_redact_gap")
2347        .principal(username)
2348        .source(AuditAuthSource::System)
2349        .resource(format!(
2350            "subscription:{}->{}",
2351            source, subscription.target_queue
2352        ))
2353        .outcome(Outcome::Success)
2354        .field(AuditFieldEscaper::field("source", source))
2355        .field(AuditFieldEscaper::field(
2356            "target_queue",
2357            subscription.target_queue.clone(),
2358        ))
2359        .field(AuditFieldEscaper::field(
2360            "subscription",
2361            subscription.name.clone(),
2362        ))
2363        .field(AuditFieldEscaper::field("columns", columns))
2364        .field(AuditFieldEscaper::field("role", role.as_str()));
2365    if let Some(t) = tenant {
2366        event = event.tenant(t);
2367    }
2368    runtime.inner.audit_log.record_event(event.build());
2369}
2370
2371fn subscription_redact_gap_columns(
2372    auth_store: &crate::auth::store::AuthStore,
2373    principal: &crate::auth::UserId,
2374    source: &str,
2375    subscription: &crate::catalog::SubscriptionDescriptor,
2376) -> BTreeSet<String> {
2377    let redacted: HashSet<String> = subscription
2378        .redact_fields
2379        .iter()
2380        .map(|field| field.to_ascii_lowercase())
2381        .collect();
2382    auth_store
2383        .effective_policies(principal)
2384        .iter()
2385        .flat_map(|policy| policy.statements.iter())
2386        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2387        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2388        .flat_map(|statement| statement.resources.iter())
2389        .filter_map(|resource| denied_column_for_source(resource, source))
2390        .filter(|column| !redact_covers_column(&redacted, source, column))
2391        .collect()
2392}
2393
2394fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2395    match pattern {
2396        crate::auth::policies::ActionPattern::Wildcard => true,
2397        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2398        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2399            "select".len() > prefix.len() + 1
2400                && "select".starts_with(prefix)
2401                && "select".as_bytes()[prefix.len()] == b':'
2402        }
2403    }
2404}
2405
2406fn denied_column_for_source(
2407    resource: &crate::auth::policies::ResourcePattern,
2408    source: &str,
2409) -> Option<String> {
2410    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2411        return None;
2412    };
2413    if kind != "column" {
2414        return None;
2415    }
2416    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2417    (column.table_resource_name() == source).then_some(column.column)
2418}
2419
2420fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2421    let column = column.to_ascii_lowercase();
2422    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2423    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2424}
2425
2426fn subscription_would_create_cycle(
2427    db: &crate::storage::unified::devx::RedDB,
2428    source: &str,
2429    target: &str,
2430) -> bool {
2431    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2432    for contract in db.collection_contracts() {
2433        for subscription in contract
2434            .subscriptions
2435            .into_iter()
2436            .filter(|subscription| subscription.enabled)
2437        {
2438            graph
2439                .entry(subscription.source)
2440                .or_default()
2441                .push(subscription.target_queue);
2442        }
2443    }
2444    graph
2445        .entry(source.to_string())
2446        .or_default()
2447        .push(target.to_string());
2448
2449    let mut stack = vec![target.to_string()];
2450    let mut seen = HashSet::new();
2451    while let Some(node) = stack.pop() {
2452        if node == source {
2453            return true;
2454        }
2455        if !seen.insert(node.clone()) {
2456            continue;
2457        }
2458        if let Some(next) = graph.get(&node) {
2459            stack.extend(next.iter().cloned());
2460        }
2461    }
2462    false
2463}
2464
2465pub(crate) fn ensure_event_target_queue_pub(
2466    runtime: &RedDBRuntime,
2467    queue: &str,
2468) -> RedDBResult<()> {
2469    ensure_event_target_queue(runtime, queue)
2470}
2471
2472fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2473    let store = runtime.inner.db.store();
2474    if store.get_collection(queue).is_some() {
2475        return Ok(());
2476    }
2477    store
2478        .create_collection(queue)
2479        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2480    runtime
2481        .inner
2482        .db
2483        .save_collection_contract(event_queue_collection_contract(queue))
2484        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2485    store.set_config_tree(
2486        &format!("queue.{queue}.mode"),
2487        &crate::serde_json::Value::String("fanout".to_string()),
2488    );
2489    Ok(())
2490}
2491
2492fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2493    let now = current_unix_ms();
2494    crate::physical::CollectionContract {
2495        name: queue.to_string(),
2496        declared_model: crate::catalog::CollectionModel::Queue,
2497        schema_mode: crate::catalog::SchemaMode::Dynamic,
2498        origin: crate::physical::ContractOrigin::Implicit,
2499        version: 1,
2500        created_at_unix_ms: now,
2501        updated_at_unix_ms: now,
2502        default_ttl_ms: None,
2503        vector_dimension: None,
2504        vector_metric: None,
2505        context_index_fields: Vec::new(),
2506        declared_columns: Vec::new(),
2507        table_def: None,
2508        timestamps_enabled: false,
2509        context_index_enabled: false,
2510        metrics_raw_retention_ms: None,
2511        metrics_rollup_policies: Vec::new(),
2512        metrics_tenant_identity: None,
2513        metrics_namespace: None,
2514        append_only: true,
2515        subscriptions: Vec::new(),
2516        analytics_config: Vec::new(),
2517        session_key: None,
2518        session_gap_ms: None,
2519        retention_duration_ms: None,
2520        analytical_storage: None,
2521    }
2522}
2523
2524fn build_table_def_from_create_table(
2525    query: &CreateTableQuery,
2526) -> RedDBResult<crate::storage::schema::TableDef> {
2527    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2528    for column in &query.columns {
2529        if column.primary_key {
2530            table.primary_key.push(column.name.clone());
2531            table.constraints.push(
2532                crate::storage::schema::Constraint::new(
2533                    format!("pk_{}", column.name),
2534                    crate::storage::schema::ConstraintType::PrimaryKey,
2535                )
2536                .on_columns(vec![column.name.clone()]),
2537            );
2538        }
2539        if column.unique {
2540            table.constraints.push(
2541                crate::storage::schema::Constraint::new(
2542                    format!("uniq_{}", column.name),
2543                    crate::storage::schema::ConstraintType::Unique,
2544                )
2545                .on_columns(vec![column.name.clone()]),
2546            );
2547        }
2548        if column.not_null {
2549            table.constraints.push(
2550                crate::storage::schema::Constraint::new(
2551                    format!("not_null_{}", column.name),
2552                    crate::storage::schema::ConstraintType::NotNull,
2553                )
2554                .on_columns(vec![column.name.clone()]),
2555            );
2556        }
2557        table.columns.push(column_def_from_ddl(column)?);
2558    }
2559    // WITH timestamps = true: append the two runtime-managed columns
2560    // to the schema so resolved_contract_columns exposes them to the
2561    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2562    // not-nullable; the write path auto-fills them.
2563    if query.timestamps {
2564        table.columns.push(
2565            crate::storage::schema::ColumnDef::new(
2566                "created_at".to_string(),
2567                crate::storage::schema::DataType::UnsignedInteger,
2568            )
2569            .not_null(),
2570        );
2571        table.columns.push(
2572            crate::storage::schema::ColumnDef::new(
2573                "updated_at".to_string(),
2574                crate::storage::schema::DataType::UnsignedInteger,
2575            )
2576            .not_null(),
2577        );
2578        table.constraints.push(
2579            crate::storage::schema::Constraint::new(
2580                "not_null_created_at".to_string(),
2581                crate::storage::schema::ConstraintType::NotNull,
2582            )
2583            .on_columns(vec!["created_at".to_string()]),
2584        );
2585        table.constraints.push(
2586            crate::storage::schema::Constraint::new(
2587                "not_null_updated_at".to_string(),
2588                crate::storage::schema::ConstraintType::NotNull,
2589            )
2590            .on_columns(vec!["updated_at".to_string()]),
2591        );
2592    }
2593    table
2594        .validate()
2595        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2596    Ok(table)
2597}
2598
2599fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2600    let data_type = resolve_declared_data_type(&column.data_type)
2601        .map_err(|err| RedDBError::Query(err.to_string()))?;
2602    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2603    if column.not_null {
2604        column_def = column_def.not_null();
2605    }
2606    if let Some(default) = &column.default {
2607        column_def = column_def.with_default(default.as_bytes().to_vec());
2608    }
2609    if column.compress.unwrap_or(0) > 0 {
2610        column_def = column_def.compressed();
2611    }
2612    if !column.enum_variants.is_empty() {
2613        column_def = column_def.with_variants(column.enum_variants.clone());
2614    }
2615    if let Some(precision) = column.decimal_precision {
2616        column_def = column_def.with_precision(precision);
2617    }
2618    if let Some(element_type) = &column.array_element {
2619        column_def = column_def.with_element_type(
2620            resolve_declared_data_type(element_type)
2621                .map_err(|err| RedDBError::Query(err.to_string()))?,
2622        );
2623    }
2624    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2625    if column.unique {
2626        column_def = column_def.with_metadata("unique", "true");
2627    }
2628    if column.primary_key {
2629        column_def = column_def.with_metadata("primary_key", "true");
2630    }
2631    Ok(column_def)
2632}
2633
2634fn current_unix_ms() -> u128 {
2635    std::time::SystemTime::now()
2636        .duration_since(std::time::UNIX_EPOCH)
2637        .unwrap_or_default()
2638        .as_millis()
2639}
2640
2641#[cfg(test)]
2642mod tests {
2643    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2644    use crate::auth::store::{AuthStore, PrincipalRef};
2645    use crate::auth::UserId;
2646    use crate::auth::{AuthConfig, Role};
2647    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2648    use crate::storage::schema::Value;
2649    use crate::{RedDBOptions, RedDBRuntime};
2650    use std::sync::Arc;
2651
2652    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2653        Policy {
2654            id: id.to_string(),
2655            version: 1,
2656            tenant: None,
2657            created_at: 0,
2658            updated_at: 0,
2659            statements: vec![Statement {
2660                sid: None,
2661                effect: Effect::Allow,
2662                actions: vec![ActionPattern::Exact(action.to_string())],
2663                resources: vec![ResourcePattern::Exact {
2664                    kind: "collection".to_string(),
2665                    name: collection.to_string(),
2666                }],
2667                condition: None,
2668            }],
2669        }
2670    }
2671
2672    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2673        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2674        *rt.inner.auth_store.write() = Some(store.clone());
2675        store
2676    }
2677
2678    #[test]
2679    fn drop_denied_without_iam_policy() {
2680        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2681        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2682        let store = wire_auth_store(&rt);
2683        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2684        let select_only = Policy {
2685            id: "select-only".to_string(),
2686            version: 1,
2687            tenant: None,
2688            created_at: 0,
2689            updated_at: 0,
2690            statements: vec![Statement {
2691                sid: None,
2692                effect: Effect::Allow,
2693                actions: vec![ActionPattern::Exact("select".to_string())],
2694                resources: vec![ResourcePattern::Wildcard],
2695                condition: None,
2696            }],
2697        };
2698        store.put_policy_internal(select_only).unwrap();
2699        let alice = UserId::from_parts(None, "alice");
2700        store
2701            .attach_policy(PrincipalRef::User(alice), "select-only")
2702            .unwrap();
2703        set_current_auth_identity("alice".to_string(), Role::Write);
2704        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2705        clear_current_auth_identity();
2706        assert!(
2707            format!("{err}").contains("denied by IAM policy"),
2708            "got: {err}"
2709        );
2710    }
2711
2712    #[test]
2713    fn drop_allowed_with_explicit_iam_policy() {
2714        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2715        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2716        let store = wire_auth_store(&rt);
2717        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2718        store.put_policy_internal(policy).unwrap();
2719        let bob = UserId::from_parts(None, "bob");
2720        store
2721            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2722            .unwrap();
2723        set_current_auth_identity("bob".to_string(), Role::Write);
2724        rt.execute_query("DROP TABLE bar").unwrap();
2725        clear_current_auth_identity();
2726    }
2727
2728    #[test]
2729    fn drop_allowed_with_wildcard_iam_policy() {
2730        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2731        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2732        let store = wire_auth_store(&rt);
2733        let policy = Policy {
2734            id: "allow-drop-all".to_string(),
2735            version: 1,
2736            tenant: None,
2737            created_at: 0,
2738            updated_at: 0,
2739            statements: vec![Statement {
2740                sid: None,
2741                effect: Effect::Allow,
2742                actions: vec![ActionPattern::Exact("drop".to_string())],
2743                resources: vec![ResourcePattern::Wildcard],
2744                condition: None,
2745            }],
2746        };
2747        store.put_policy_internal(policy).unwrap();
2748        let carl = UserId::from_parts(None, "carl");
2749        store
2750            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2751            .unwrap();
2752        set_current_auth_identity("carl".to_string(), Role::Write);
2753        rt.execute_query("DROP TABLE baz").unwrap();
2754        clear_current_auth_identity();
2755    }
2756
2757    #[test]
2758    fn truncate_denied_without_iam_policy() {
2759        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2760        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2761        let store = wire_auth_store(&rt);
2762        // Acceptance #2 (#712 / S5A): in `policy_only` mode a
2763        // principal with no matching policy is denied even if their
2764        // role would have permitted the action. The pre-#712 default
2765        // of "no policy → deny" only holds under `policy_only`, so
2766        // pin the mode explicitly here so the assertion holds
2767        // regardless of the construction-time default.
2768        store
2769            .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2770        // A policy exists (IAM active) but gives no truncate right.
2771        let select_only = Policy {
2772            id: "select-only-2".to_string(),
2773            version: 1,
2774            tenant: None,
2775            created_at: 0,
2776            updated_at: 0,
2777            statements: vec![Statement {
2778                sid: None,
2779                effect: Effect::Allow,
2780                actions: vec![ActionPattern::Exact("select".to_string())],
2781                resources: vec![ResourcePattern::Wildcard],
2782                condition: None,
2783            }],
2784        };
2785        store.put_policy_internal(select_only).unwrap();
2786        let dana = UserId::from_parts(None, "dana");
2787        store
2788            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2789            .unwrap();
2790        set_current_auth_identity("dana".to_string(), Role::Write);
2791        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2792        clear_current_auth_identity();
2793        assert!(
2794            format!("{err}").contains("denied by IAM policy"),
2795            "got: {err}"
2796        );
2797    }
2798
2799    #[test]
2800    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2801        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2802        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2803            .unwrap();
2804        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2805            .unwrap();
2806        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2807            .unwrap();
2808
2809        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2810        assert_eq!(truncated.statement_type, "truncate");
2811        assert_eq!(truncated.affected_rows, 0);
2812
2813        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2814        assert!(empty.result.records.is_empty());
2815
2816        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2817            .unwrap();
2818        let selected = rt
2819            .execute_query("SELECT name FROM users WHERE id = 3")
2820            .unwrap();
2821        let name = selected.result.records[0].get("name").unwrap();
2822        assert_eq!(name, &Value::text("cy"));
2823        assert!(rt.db().collection_contract("users").is_some());
2824        assert!(rt
2825            .inner
2826            .index_store
2827            .list_indices("users")
2828            .iter()
2829            .any(|index| index.name == "idx_users_id"));
2830    }
2831
2832    #[test]
2833    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2834        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2835        rt.execute_query("CREATE QUEUE tasks").unwrap();
2836        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2837
2838        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2839        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2840
2841        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2842        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2843        assert_eq!(
2844            len.result.records[0].get("len"),
2845            Some(&Value::UnsignedInteger(0))
2846        );
2847    }
2848
2849    #[test]
2850    fn truncate_system_schema_is_read_only() {
2851        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2852        let err = rt
2853            .execute_query("TRUNCATE COLLECTION red.collections")
2854            .unwrap_err();
2855        assert!(format!("{err}").contains("system schema is read-only"));
2856    }
2857
2858    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2859
2860    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2861        let result = rt
2862            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2863            .expect("peek queue");
2864        result
2865            .result
2866            .records
2867            .iter()
2868            .map(
2869                |record| match record.get("payload").expect("payload column") {
2870                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2871                    other => panic!("expected JSON queue payload, got {other:?}"),
2872                },
2873            )
2874            .collect()
2875    }
2876
2877    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2878    /// `truncate` event, not one delete event per row.
2879    #[test]
2880    fn truncate_event_enabled_table_emits_single_truncate_event() {
2881        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2882        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2883            .unwrap();
2884        rt.execute_query(
2885            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2886        )
2887        .unwrap();
2888
2889        // Drain the 3 insert events so we start clean.
2890        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2891
2892        rt.execute_query("TRUNCATE TABLE users").unwrap();
2893
2894        let events = queue_payloads(&rt, "users_events");
2895        // Must be exactly 1 truncate event, not 3 delete events.
2896        assert_eq!(
2897            events.len(),
2898            1,
2899            "expected 1 truncate event, got {}",
2900            events.len()
2901        );
2902        let ev = events[0].as_object().expect("event is object");
2903        assert_eq!(
2904            ev.get("op").and_then(crate::json::Value::as_str),
2905            Some("truncate")
2906        );
2907        assert_eq!(
2908            ev.get("collection").and_then(crate::json::Value::as_str),
2909            Some("users")
2910        );
2911        assert_eq!(
2912            ev.get("entities_count")
2913                .and_then(crate::json::Value::as_u64),
2914            Some(3)
2915        );
2916        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2917        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2918        assert!(ev
2919            .get("event_id")
2920            .and_then(crate::json::Value::as_str)
2921            .is_some_and(|s| !s.is_empty()));
2922    }
2923
2924    /// `TRUNCATE users` on a collection without event subscription emits no events.
2925    #[test]
2926    fn truncate_no_events_collection_emits_nothing() {
2927        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2928        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2929            .unwrap();
2930        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2931            .unwrap();
2932        // No EVENTS subscription — truncate must work without touching any queue.
2933        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2934        // No crash, no queue to check. Just verify truncation happened.
2935        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2936        assert!(rows.result.records.is_empty());
2937    }
2938
2939    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2940    /// `collection_dropped` event. The subscription is removed from the
2941    /// source contract but the target queue is preserved for consumer drain.
2942    #[test]
2943    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2944        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2945        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2946            .unwrap();
2947        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2948            .unwrap();
2949
2950        // Drain insert events so we start clean.
2951        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2952
2953        rt.execute_query("DROP TABLE users").unwrap();
2954
2955        // Queue must still exist with 1 collection_dropped event.
2956        let events = queue_payloads(&rt, "users_events");
2957        assert_eq!(
2958            events.len(),
2959            1,
2960            "expected 1 collection_dropped event, got {}",
2961            events.len()
2962        );
2963        let ev = events[0].as_object().expect("event is object");
2964        assert_eq!(
2965            ev.get("op").and_then(crate::json::Value::as_str),
2966            Some("collection_dropped")
2967        );
2968        assert_eq!(
2969            ev.get("collection").and_then(crate::json::Value::as_str),
2970            Some("users")
2971        );
2972        assert_eq!(
2973            ev.get("final_entities_count")
2974                .and_then(crate::json::Value::as_u64),
2975            Some(2)
2976        );
2977        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2978        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2979        assert!(ev
2980            .get("event_id")
2981            .and_then(crate::json::Value::as_str)
2982            .is_some_and(|s| !s.is_empty()));
2983
2984        // Source collection is gone.
2985        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2986        assert!(
2987            format!("{err}").contains("users"),
2988            "expected not-found error"
2989        );
2990    }
2991
2992    /// `DROP TABLE users` on a collection without event subscription works
2993    /// normally with no event emitted.
2994    #[test]
2995    fn drop_no_events_collection_emits_nothing() {
2996        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2997        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2998            .unwrap();
2999        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
3000            .unwrap();
3001        rt.execute_query("DROP TABLE plain").unwrap();
3002        // No crash and collection is gone.
3003        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
3004        assert!(format!("{err}").contains("plain"));
3005    }
3006
3007    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
3008
3009    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
3010    #[test]
3011    fn ops_filter_insert_only_ignores_update_and_delete() {
3012        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3013        rt.execute_query(
3014            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
3015        )
3016        .unwrap();
3017        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3018            .unwrap();
3019        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3020            .unwrap();
3021        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3022
3023        let events = queue_payloads(&rt, "items_events");
3024        // Only the INSERT should have fired.
3025        assert_eq!(
3026            events.len(),
3027            1,
3028            "expected 1 insert event, got {}",
3029            events.len()
3030        );
3031        assert_eq!(
3032            events[0]
3033                .as_object()
3034                .unwrap()
3035                .get("op")
3036                .and_then(crate::json::Value::as_str),
3037            Some("insert")
3038        );
3039    }
3040
3041    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
3042    #[test]
3043    fn where_filter_skips_rows_that_do_not_match() {
3044        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3045        rt.execute_query(
3046            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3047        )
3048        .unwrap();
3049
3050        // This row should generate an event.
3051        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3052            .unwrap();
3053        // This row should NOT generate an event.
3054        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3055            .unwrap();
3056
3057        let events = queue_payloads(&rt, "users_events");
3058        assert_eq!(
3059            events.len(),
3060            1,
3061            "expected 1 event (only active), got {}",
3062            events.len()
3063        );
3064        let ev = events[0].as_object().unwrap();
3065        assert_eq!(
3066            ev.get("op").and_then(crate::json::Value::as_str),
3067            Some("insert")
3068        );
3069        let after = ev.get("after").unwrap().as_object().unwrap();
3070        assert_eq!(
3071            after.get("status").and_then(crate::json::Value::as_str),
3072            Some("active")
3073        );
3074    }
3075
3076    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
3077    #[test]
3078    fn ops_filter_and_where_filter_combined() {
3079        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3080        rt.execute_query(
3081            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3082        )
3083        .unwrap();
3084
3085        // INSERT active → event
3086        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3087            .unwrap();
3088        // INSERT inactive → no event
3089        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3090            .unwrap();
3091        // UPDATE row 1 to inactive → after = inactive, no event
3092        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3093            .unwrap();
3094        // DELETE → ops_filter excludes it
3095        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3096
3097        let events = queue_payloads(&rt, "items_events");
3098        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
3099        assert_eq!(
3100            events.len(),
3101            1,
3102            "expected 1 event, got {}: {events:?}",
3103            events.len()
3104        );
3105        assert_eq!(
3106            events[0]
3107                .as_object()
3108                .unwrap()
3109                .get("op")
3110                .and_then(crate::json::Value::as_str),
3111            Some("insert")
3112        );
3113    }
3114
3115    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
3116    #[test]
3117    fn where_filter_on_delete_checks_before_state() {
3118        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3119        rt.execute_query(
3120            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3121        )
3122        .unwrap();
3123
3124        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3125            .unwrap();
3126
3127        // Delete active row → event (before-state was active)
3128        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3129        // Delete inactive row → no event (before-state was inactive)
3130        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3131
3132        let events = queue_payloads(&rt, "users_events");
3133        assert_eq!(
3134            events.len(),
3135            1,
3136            "expected 1 delete event, got {}",
3137            events.len()
3138        );
3139        let ev = events[0].as_object().unwrap();
3140        assert_eq!(
3141            ev.get("op").and_then(crate::json::Value::as_str),
3142            Some("delete")
3143        );
3144    }
3145
3146    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
3147
3148    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
3149    #[test]
3150    fn alter_add_column_on_event_enabled_table_succeeds() {
3151        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3152        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3153            .unwrap();
3154        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
3155        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3156            .unwrap();
3157        // The column is now in the contract.
3158        let contract = rt.db().collection_contract("users").unwrap();
3159        assert!(
3160            contract.declared_columns.iter().any(|c| c.name == "phone"),
3161            "phone column should be in contract"
3162        );
3163        // Subscription still enabled after the alter.
3164        assert!(
3165            contract.subscriptions.iter().any(|s| s.enabled),
3166            "subscription should remain enabled"
3167        );
3168    }
3169
3170    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
3171    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
3172    #[test]
3173    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3174        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3175        rt.execute_query(
3176            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3177        )
3178        .unwrap();
3179        // DROP COLUMN — schema change event path exercises, must not error.
3180        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3181            .unwrap();
3182        let contract = rt.db().collection_contract("items").unwrap();
3183        assert!(
3184            !contract.declared_columns.iter().any(|c| c.name == "secret"),
3185            "secret column should be removed"
3186        );
3187        // ENABLE RLS — non-column op, no schema-change event (coverage).
3188        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3189            .unwrap();
3190        // Collection and subscription still intact.
3191        assert!(
3192            contract.subscriptions.iter().any(|s| s.enabled),
3193            "subscription should remain enabled"
3194        );
3195    }
3196
3197    /// Slice G (#675) — every newly-created `CollectionModel::Vector`
3198    /// collection is marked `vector.turbo` and gains a materialised
3199    /// `TurboCollectionState`. The marker is what the SEARCH/INSERT
3200    /// hot paths read to branch between TurboQuant and the legacy
3201    /// brute-force fallback.
3202    #[test]
3203    fn create_vector_marks_collection_as_turbo_baseline() {
3204        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3205        rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3206        let store = rt.db().store();
3207        assert!(
3208            crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3209            "new vector collections must be turbo-marked baseline"
3210        );
3211        assert!(
3212            rt.db().turbo_state("embeddings").is_some(),
3213            "turbo_state must materialise after CREATE VECTOR"
3214        );
3215    }
3216
3217    /// Non-vector collections must NOT be turbo-marked. Guards against
3218    /// the always-baseline change accidentally leaking the marker onto
3219    /// CREATE TABLE / CREATE DOCUMENT / etc.
3220    #[test]
3221    fn create_table_does_not_mark_turbo() {
3222        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3223        rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3224        let store = rt.db().store();
3225        assert!(
3226            !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3227            "non-vector collections must not gain the turbo marker"
3228        );
3229        assert!(rt.db().turbo_state("plain").is_none());
3230    }
3231
3232    /// `CREATE COLLECTION KIND vector.turbo` continues to mark the
3233    /// collection (idempotent with the new always-baseline path).
3234    #[test]
3235    fn create_collection_kind_vector_turbo_still_marked() {
3236        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3237        rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3238            .unwrap();
3239        let store = rt.db().store();
3240        assert!(crate::runtime::vector_turbo_kind::is_turbo(
3241            &store, "turbo_v"
3242        ));
3243        assert!(rt.db().turbo_state("turbo_v").is_some());
3244    }
3245}