Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(
265                &query.name,
266                query.collection_model,
267                query.analytics_config.clone(),
268            )
269        };
270        self.inner
271            .db
272            .save_collection_contract(contract)
273            .map_err(|err| RedDBError::Internal(err.to_string()))?;
274        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275            store.set_config_tree(
276                &format!("red.collection_tenants.{}", query.name),
277                &crate::serde_json::Value::String(tenant_id),
278            );
279        }
280        self.inner
281            .db
282            .persist_metadata()
283            .map_err(|err| RedDBError::Internal(err.to_string()))?;
284        self.invalidate_result_cache();
285
286        Ok(RuntimeQueryResult::ok_message(
287            raw_query.to_string(),
288            &format!("{label} '{}' created", query.name),
289            "create",
290        ))
291    }
292
293    /// Bootstrap a system-owned graph collection declared `WITH ANALYTICS`
294    /// (issue #803). The SQL DDL path refuses `red.*` names via
295    /// [`is_system_schema_name`], so built-ins like `red.topology.cluster`
296    /// cannot be created through `CREATE GRAPH`; this method drives the same
297    /// contract-build / save / persist path directly, bypassing only that
298    /// guard. Idempotent: a no-op when the collection already exists (its
299    /// analytics contract is recovered from the WAL on restart), so it is safe
300    /// to call unconditionally on every boot.
301    pub fn ensure_system_graph_with_analytics(
302        &self,
303        name: &str,
304        outputs: &[crate::catalog::AnalyticsOutput],
305    ) -> RedDBResult<()> {
306        let store = self.inner.db.store();
307        if store.get_collection(name).is_some() {
308            return Ok(());
309        }
310        let analytics_config = outputs
311            .iter()
312            .map(|output| crate::catalog::AnalyticsViewDescriptor {
313                output: *output,
314                algorithm: None,
315                resolution: None,
316                max_iterations: None,
317                tolerance: None,
318            })
319            .collect();
320        store
321            .create_collection(name)
322            .map_err(|err| RedDBError::Internal(err.to_string()))?;
323        let contract = keyed_collection_contract(
324            name,
325            crate::catalog::CollectionModel::Graph,
326            analytics_config,
327        );
328        self.inner
329            .db
330            .save_collection_contract(contract)
331            .map_err(|err| RedDBError::Internal(err.to_string()))?;
332        self.inner
333            .db
334            .persist_metadata()
335            .map_err(|err| RedDBError::Internal(err.to_string()))?;
336        self.invalidate_result_cache();
337        Ok(())
338    }
339
340    pub fn execute_create_collection(
341        &self,
342        raw_query: &str,
343        query: &CreateCollectionQuery,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        let model = match query.kind.as_str() {
346            "graph" => CollectionModel::Graph,
347            "document" => CollectionModel::Document,
348            "metrics" => CollectionModel::Metrics,
349            "vector.turbo" => {
350                let dimension = query.vector_dimension.ok_or_else(|| {
351                    RedDBError::Query(
352                        "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353                    )
354                })?;
355                let create = CreateVectorQuery {
356                    name: query.name.clone(),
357                    dimension,
358                    metric: query
359                        .vector_metric
360                        .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361                    if_not_exists: query.if_not_exists,
362                };
363                let result = self.execute_create_vector(raw_query, &create)?;
364                // Issue #693 — durable kind marker that distinguishes
365                // this collection from the legacy `vector` path on
366                // every restart. Runtime state (TurboQuantIndex +
367                // TurboExtent) is lazily materialised by
368                // `RedDB::turbo_state` on first INSERT/SEARCH so the
369                // marker is the only thing that needs to survive.
370                let store = self.inner.db.store();
371                crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372                self.inner
373                    .db
374                    .persist_metadata()
375                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
376                // Eagerly construct the state so the TurboExtent (if
377                // any) is reserved at creation time rather than on
378                // first INSERT. Errors are swallowed: the lazy path in
379                // `turbo_state` will retry on the next access.
380                let _ = self.inner.db.turbo_state(&query.name);
381                return Ok(result);
382            }
383            // KIND blockchain — issue #523 foundation: stored on top of a
384            // Table-shaped collection. The `chain` marker + reserved-column
385            // discipline make the difference. Schema validation, conflict
386            // retries, and `verify_chain` come in later iterations.
387            "blockchain" => CollectionModel::Table,
388            other => {
389                return Err(RedDBError::Query(format!(
390                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391                )));
392            }
393        };
394        let create = CreateTableQuery {
395            collection_model: model,
396            name: query.name.clone(),
397            columns: Vec::new(),
398            if_not_exists: query.if_not_exists,
399            default_ttl_ms: None,
400            metrics_rollup_policies: Vec::new(),
401            context_index_fields: Vec::new(),
402            context_index_enabled: false,
403            timestamps: false,
404            partition_by: None,
405            tenant_by: None,
406            append_only: false,
407            subscriptions: Vec::new(),
408            analytics_config: Vec::new(),
409            vault_own_master_key: false,
410        };
411        let result = self.execute_create_table(raw_query, &create)?;
412        if query.kind == "blockchain" {
413            self.install_blockchain_kind(&query.name)?;
414        }
415        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
416        // already produces a validated 32-byte pubkey list; installing
417        // the registry stamps the per-collection signer set into
418        // `red_config` so the INSERT path can load it cheaply.
419        if !query.allowed_signers.is_empty() {
420            let actor = crate::runtime::impl_core::current_user_projected()
421                .unwrap_or_else(|| "@system/create-collection".to_string());
422            crate::runtime::signed_writes_kind::install(
423                &self.inner.db.store(),
424                &query.name,
425                &query.allowed_signers,
426                &actor,
427            );
428        }
429        Ok(result)
430    }
431
432    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
433    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
434    /// has a row at height 0 we leave it.
435    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436        use crate::runtime::blockchain_kind;
437        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438        use std::sync::Arc;
439
440        let store = self.inner.db.store();
441        blockchain_kind::mark_as_chain(&store, name);
442
443        let existing_tip = blockchain_kind::chain_tip(&store, name);
444        if existing_tip.height.is_some() {
445            return Ok(());
446        }
447
448        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450            fields.into_iter().collect();
451        let entity = UnifiedEntity::new(
452            EntityId::new(0),
453            EntityKind::TableRow {
454                table: Arc::from(name),
455                row_id: 0,
456            },
457            EntityData::Row(RowData {
458                columns: Vec::new(),
459                named: Some(named),
460                schema: None,
461            }),
462        );
463        store
464            .insert_auto(name, entity)
465            .map_err(|err| RedDBError::Internal(err.to_string()))?;
466        // #524: prime the in-memory tip cache so the chain-tip endpoint and
467        // subsequent INSERTs don't have to scan the collection to find genesis.
468        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469            self.inner
470                .chain_tip_cache
471                .lock()
472                .insert(name.to_string(), tip);
473        }
474        Ok(())
475    }
476
477    pub fn execute_create_vector(
478        &self,
479        raw_query: &str,
480        query: &CreateVectorQuery,
481    ) -> RedDBResult<RuntimeQueryResult> {
482        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483        if is_system_schema_name(&query.name) {
484            return Err(RedDBError::Query("system schema is read-only".to_string()));
485        }
486        let store = self.inner.db.store();
487        if store.get_collection(&query.name).is_some() {
488            if query.if_not_exists {
489                return Ok(RuntimeQueryResult::ok_message(
490                    raw_query.to_string(),
491                    &format!("vector '{}' already exists", query.name),
492                    "create",
493                ));
494            }
495            return Err(RedDBError::Query(format!(
496                "vector '{}' already exists",
497                query.name
498            )));
499        }
500
501        store
502            .create_collection(&query.name)
503            .map_err(|err| RedDBError::Internal(err.to_string()))?;
504        self.inner
505            .db
506            .save_collection_contract(vector_collection_contract(query))
507            .map_err(|err| RedDBError::Internal(err.to_string()))?;
508        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509            store.set_config_tree(
510                &format!("red.collection_tenants.{}", query.name),
511                &crate::serde_json::Value::String(tenant_id),
512            );
513        }
514        // Slice G (#675) — register `vector.turbo` as the built-in
515        // baseline for every newly-created vector collection. The
516        // marker is the durable signal `RedDB::turbo_state` reads to
517        // materialise the TurboQuantIndex + TurboExtent on first
518        // INSERT/SEARCH. Pre-existing vector collections were created
519        // without the marker and stay on the brute-force fallback path
520        // until they are explicitly migrated — there is no implicit
521        // backfill in this slice.
522        crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523        self.inner
524            .db
525            .persist_metadata()
526            .map_err(|err| RedDBError::Internal(err.to_string()))?;
527        // Eagerly materialise the per-collection turbo state so the
528        // TurboExtent (when the store is paged) is reserved at CREATE
529        // time rather than on the first INSERT. Failures are swallowed
530        // — the lazy path in `turbo_state` retries on next access.
531        let _ = self.inner.db.turbo_state(&query.name);
532        self.invalidate_result_cache();
533
534        Ok(RuntimeQueryResult::ok_message(
535            raw_query.to_string(),
536            &format!("vector '{}' created", query.name),
537            "create",
538        ))
539    }
540
541    fn provision_vault_key_material(
542        &self,
543        collection: &str,
544        own_master_key: bool,
545    ) -> RedDBResult<()> {
546        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548        })?;
549        if !auth_store.is_vault_backed() {
550            return Err(RedDBError::Query(
551                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552            ));
553        }
554
555        if auth_store.vault_secret_key().is_none() {
556            let key = crate::auth::store::random_bytes(32);
557            auth_store
558                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559                .map_err(|err| RedDBError::Query(err.to_string()))?;
560        }
561
562        if own_master_key {
563            let key = crate::auth::store::random_bytes(32);
564            auth_store
565                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566                .map_err(|err| RedDBError::Query(err.to_string()))?;
567        }
568
569        Ok(())
570    }
571
572    /// Execute DROP TABLE
573    ///
574    /// Drops the collection and all its data from the store.
575    pub fn execute_drop_table(
576        &self,
577        raw_query: &str,
578        query: &DropTableQuery,
579    ) -> RedDBResult<RuntimeQueryResult> {
580        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581        let store = self.inner.db.store();
582
583        if is_system_schema_name(&query.name) {
584            return Err(RedDBError::Query("system schema is read-only".to_string()));
585        }
586
587        let exists = store.get_collection(&query.name).is_some();
588        if !exists {
589            if query.if_exists {
590                return Ok(RuntimeQueryResult::ok_message(
591                    raw_query.to_string(),
592                    &format!("table '{}' does not exist", query.name),
593                    "drop",
594                ));
595            }
596            return Err(RedDBError::NotFound(format!(
597                "table '{}' not found",
598                query.name
599            )));
600        }
601        let actual =
602            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605        // Emit 1 collection_dropped event before storage is wiped.
606        // Queue is preserved; subscription is removed with the contract below.
607        let final_count = store
608            .get_collection(&query.name)
609            .map(|manager| manager.query_all(|_| true).len() as u64)
610            .unwrap_or(0);
611        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612            self,
613            &query.name,
614            final_count,
615        )?;
616
617        let orphaned_indices: Vec<String> = self
618            .inner
619            .index_store
620            .list_indices(&query.name)
621            .into_iter()
622            .map(|index| index.name)
623            .collect();
624        for name in &orphaned_indices {
625            self.inner.index_store.drop_index(name, &query.name);
626        }
627
628        store
629            .drop_collection(&query.name)
630            .map_err(|err| RedDBError::Internal(err.to_string()))?;
631        self.inner.db.invalidate_vector_index(&query.name);
632        self.inner.db.clear_collection_default_ttl_ms(&query.name);
633        self.inner
634            .db
635            .remove_collection_contract(&query.name)
636            .map_err(|err| RedDBError::Internal(err.to_string()))?;
637        self.clear_table_planner_stats(&query.name);
638        self.invalidate_result_cache();
639        // Issue #119: a dropped collection vanishes from every
640        // (tenant, role)'s visible-collections set. Auth is optional in
641        // embedded mode so guard the call.
642        if let Some(store) = self.inner.auth_store.read().clone() {
643            store.invalidate_visible_collections_cache();
644        }
645        self.inner
646            .db
647            .persist_metadata()
648            .map_err(|err| RedDBError::Internal(err.to_string()))?;
649        // Issue #120 — drop both the collection entries *and* every
650        // index entry that was scoped to this collection.  Dropping the
651        // collection wipes columns + collection-name + type-tags +
652        // index hits in one pass via `purge_collection_entries`, so
653        // the explicit `DropIndex` calls would be redundant.
654        self.schema_vocabulary_apply(
655            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656                collection: query.name.clone(),
657            },
658        );
659
660        Ok(RuntimeQueryResult::ok_message(
661            raw_query.to_string(),
662            &format!("table '{}' dropped", query.name),
663            "drop",
664        ))
665    }
666
667    pub fn execute_drop_graph(
668        &self,
669        raw_query: &str,
670        query: &DropGraphQuery,
671    ) -> RedDBResult<RuntimeQueryResult> {
672        self.execute_drop_typed_collection(
673            raw_query,
674            &query.name,
675            query.if_exists,
676            CollectionModel::Graph,
677            "graph",
678        )
679    }
680
681    pub fn execute_drop_vector(
682        &self,
683        raw_query: &str,
684        query: &DropVectorQuery,
685    ) -> RedDBResult<RuntimeQueryResult> {
686        self.execute_drop_typed_collection(
687            raw_query,
688            &query.name,
689            query.if_exists,
690            CollectionModel::Vector,
691            "vector",
692        )
693    }
694
695    pub fn execute_drop_document(
696        &self,
697        raw_query: &str,
698        query: &DropDocumentQuery,
699    ) -> RedDBResult<RuntimeQueryResult> {
700        self.execute_drop_typed_collection(
701            raw_query,
702            &query.name,
703            query.if_exists,
704            CollectionModel::Document,
705            "document",
706        )
707    }
708
709    pub fn execute_drop_kv(
710        &self,
711        raw_query: &str,
712        query: &DropKvQuery,
713    ) -> RedDBResult<RuntimeQueryResult> {
714        let label = polymorphic_resolver::model_name(query.model);
715        self.execute_drop_typed_collection(
716            raw_query,
717            &query.name,
718            query.if_exists,
719            query.model,
720            label,
721        )
722    }
723
724    pub fn execute_drop_collection(
725        &self,
726        raw_query: &str,
727        query: &DropCollectionQuery,
728    ) -> RedDBResult<RuntimeQueryResult> {
729        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730        if is_system_schema_name(&query.name) {
731            return Err(RedDBError::Query("system schema is read-only".to_string()));
732        }
733        let store = self.inner.db.store();
734        if store.get_collection(&query.name).is_none() {
735            if query.if_exists {
736                return Ok(RuntimeQueryResult::ok_message(
737                    raw_query.to_string(),
738                    &format!("collection '{}' does not exist", query.name),
739                    "drop",
740                ));
741            }
742            return Err(RedDBError::NotFound(format!(
743                "collection '{}' not found",
744                query.name
745            )));
746        }
747
748        let actual =
749            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750        if let Some(expected) = query.model {
751            polymorphic_resolver::ensure_model_match(expected, actual)?;
752        }
753
754        match actual {
755            CollectionModel::Table => self.execute_drop_table(
756                raw_query,
757                &DropTableQuery {
758                    name: query.name.clone(),
759                    if_exists: query.if_exists,
760                },
761            ),
762            CollectionModel::TimeSeries => self.execute_drop_timeseries(
763                raw_query,
764                &DropTimeSeriesQuery {
765                    name: query.name.clone(),
766                    if_exists: query.if_exists,
767                },
768            ),
769            CollectionModel::Queue => self.execute_drop_queue(
770                raw_query,
771                &DropQueueQuery {
772                    name: query.name.clone(),
773                    if_exists: query.if_exists,
774                },
775            ),
776            CollectionModel::Graph => self.execute_drop_graph(
777                raw_query,
778                &DropGraphQuery {
779                    name: query.name.clone(),
780                    if_exists: query.if_exists,
781                },
782            ),
783            CollectionModel::Vector => self.execute_drop_vector(
784                raw_query,
785                &DropVectorQuery {
786                    name: query.name.clone(),
787                    if_exists: query.if_exists,
788                },
789            ),
790            CollectionModel::Document => self.execute_drop_document(
791                raw_query,
792                &DropDocumentQuery {
793                    name: query.name.clone(),
794                    if_exists: query.if_exists,
795                },
796            ),
797            CollectionModel::Kv => self.execute_drop_kv(
798                raw_query,
799                &DropKvQuery {
800                    name: query.name.clone(),
801                    if_exists: query.if_exists,
802                    model: CollectionModel::Kv,
803                },
804            ),
805            CollectionModel::Config => self.execute_drop_kv(
806                raw_query,
807                &DropKvQuery {
808                    name: query.name.clone(),
809                    if_exists: query.if_exists,
810                    model: CollectionModel::Config,
811                },
812            ),
813            CollectionModel::Vault => self.execute_drop_kv(
814                raw_query,
815                &DropKvQuery {
816                    name: query.name.clone(),
817                    if_exists: query.if_exists,
818                    model: CollectionModel::Vault,
819                },
820            ),
821            CollectionModel::Hll => self.execute_probabilistic_command(
822                raw_query,
823                &ProbabilisticCommand::DropHll {
824                    name: query.name.clone(),
825                    if_exists: query.if_exists,
826                },
827            ),
828            CollectionModel::Sketch => self.execute_probabilistic_command(
829                raw_query,
830                &ProbabilisticCommand::DropSketch {
831                    name: query.name.clone(),
832                    if_exists: query.if_exists,
833                },
834            ),
835            CollectionModel::Filter => self.execute_probabilistic_command(
836                raw_query,
837                &ProbabilisticCommand::DropFilter {
838                    name: query.name.clone(),
839                    if_exists: query.if_exists,
840                },
841            ),
842            CollectionModel::Metrics => self.execute_drop_typed_collection(
843                raw_query,
844                &query.name,
845                query.if_exists,
846                CollectionModel::Metrics,
847                "metrics",
848            ),
849            CollectionModel::Mixed => self.execute_drop_typed_collection(
850                raw_query,
851                &query.name,
852                query.if_exists,
853                CollectionModel::Mixed,
854                "collection",
855            ),
856        }
857    }
858
859    /// Issue #801 — guard `ALTER GRAPH ... ADD|DROP ANALYTICS` so analytics
860    /// lifecycle mutations only land on a collection declared as a graph. The
861    /// `<graph>.<output>` resolver only consults `analytics_config` on
862    /// graph-model contracts, so allowing the op on any other model would write
863    /// dead config the reader can never surface.
864    fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865        let is_graph = self
866            .inner
867            .db
868            .collection_contract(name)
869            .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870            .unwrap_or(false);
871        if !is_graph {
872            return Err(RedDBError::Query(format!(
873                "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874            )));
875        }
876        Ok(())
877    }
878
879    /// Execute ALTER TABLE
880    ///
881    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
882    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
883    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
884    pub fn execute_alter_table(
885        &self,
886        raw_query: &str,
887        query: &AlterTableQuery,
888    ) -> RedDBResult<RuntimeQueryResult> {
889        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890        let store = self.inner.db.store();
891
892        // Verify the table exists.
893        if store.get_collection(&query.name).is_none() {
894            return Err(RedDBError::NotFound(format!(
895                "table '{}' not found",
896                query.name
897            )));
898        }
899
900        let mut messages = Vec::new();
901
902        // Collect column-level changes upfront for schema-change event emission below.
903        let fields_added: Vec<String> = query
904            .operations
905            .iter()
906            .filter_map(|op| {
907                if let AlterOperation::AddColumn(col) = op {
908                    Some(col.name.clone())
909                } else {
910                    None
911                }
912            })
913            .collect();
914        let fields_removed: Vec<String> = query
915            .operations
916            .iter()
917            .filter_map(|op| {
918                if let AlterOperation::DropColumn(name) = op {
919                    Some(name.clone())
920                } else {
921                    None
922                }
923            })
924            .collect();
925
926        for op in &query.operations {
927            match op {
928                AlterOperation::AddColumn(col) => {
929                    // Schema-on-read: column will be available on next insert.
930                    messages.push(format!("column '{}' added", col.name));
931                }
932                AlterOperation::DropColumn(name) => {
933                    messages.push(format!("column '{}' dropped", name));
934                }
935                AlterOperation::RenameColumn { from, to } => {
936                    messages.push(format!("column '{}' renamed to '{}'", from, to));
937                }
938                AlterOperation::AttachPartition { child, bound } => {
939                    // Persist child → parent binding in red_config so the
940                    // future planner-side pruner can enumerate children and
941                    // evaluate their bounds.
942                    store.set_config_tree(
943                        &format!("partition.{}.children.{}", query.name, child),
944                        &crate::serde_json::Value::String(bound.clone()),
945                    );
946                    messages.push(format!(
947                        "partition '{child}' attached to '{}' ({bound})",
948                        query.name
949                    ));
950                }
951                AlterOperation::DetachPartition { child } => {
952                    store.set_config_tree(
953                        &format!("partition.{}.children.{}", query.name, child),
954                        &crate::serde_json::Value::Null,
955                    );
956                    messages.push(format!(
957                        "partition '{child}' detached from '{}'",
958                        query.name
959                    ));
960                }
961                AlterOperation::EnableRowLevelSecurity => {
962                    self.inner
963                        .rls_enabled_tables
964                        .write()
965                        .insert(query.name.clone());
966                    // Persist flag so RLS survives restart via red_config.
967                    store.set_config_tree(
968                        &format!("rls.enabled.{}", query.name),
969                        &crate::serde_json::Value::Bool(true),
970                    );
971                    self.invalidate_plan_cache();
972                    messages.push(format!("row level security enabled on '{}'", query.name));
973                }
974                AlterOperation::DisableRowLevelSecurity => {
975                    self.inner.rls_enabled_tables.write().remove(&query.name);
976                    store.set_config_tree(
977                        &format!("rls.enabled.{}", query.name),
978                        &crate::serde_json::Value::Null,
979                    );
980                    self.invalidate_plan_cache();
981                    messages.push(format!("row level security disabled on '{}'", query.name));
982                }
983                // Phase 2.5.4: retrofit tenancy onto an existing table.
984                AlterOperation::EnableTenancy { column } => {
985                    store.set_config_tree(
986                        &format!("tenant_tables.{}.column", query.name),
987                        &crate::serde_json::Value::String(column.clone()),
988                    );
989                    self.register_tenant_table(&query.name, column);
990                    self.invalidate_plan_cache();
991                    messages.push(format!(
992                        "tenancy enabled on '{}' by column '{column}'",
993                        query.name
994                    ));
995                }
996                AlterOperation::DisableTenancy => {
997                    store.set_config_tree(
998                        &format!("tenant_tables.{}.column", query.name),
999                        &crate::serde_json::Value::Null,
1000                    );
1001                    self.unregister_tenant_table(&query.name);
1002                    self.invalidate_plan_cache();
1003                    messages.push(format!("tenancy disabled on '{}'", query.name));
1004                }
1005                AlterOperation::SetAppendOnly(on) => {
1006                    // Contract is the single source of truth for the
1007                    // UPDATE/DELETE parse-time guard. The flag lands
1008                    // below via `apply_alter_operations_to_contract`;
1009                    // here we only publish the human-readable message.
1010                    messages.push(format!(
1011                        "append_only {} on '{}'",
1012                        if *on { "enabled" } else { "disabled" },
1013                        query.name
1014                    ));
1015                }
1016                AlterOperation::SetVersioned(on) => {
1017                    // Opt the collection into (or out of) Git-for-Data.
1018                    // Persists a row in red_vcs_settings; next AS OF /
1019                    // merge / diff against this table honours the
1020                    // flag. Retroactive: existing row versions whose
1021                    // xmins are still pinned by commits become
1022                    // reachable via AS OF COMMIT immediately.
1023                    self.vcs_set_versioned(&query.name, *on)?;
1024                    messages.push(format!(
1025                        "versioned {} on '{}'",
1026                        if *on { "enabled" } else { "disabled" },
1027                        query.name
1028                    ));
1029                }
1030                AlterOperation::EnableEvents(subscription) => {
1031                    let mut subscription = subscription.clone();
1032                    subscription.source = query.name.clone();
1033                    validate_event_subscriptions(
1034                        self,
1035                        &query.name,
1036                        std::slice::from_ref(&subscription),
1037                    )?;
1038                    ensure_event_target_queue(self, &subscription.target_queue)?;
1039                    messages.push(format!(
1040                        "events enabled on '{}' to '{}'",
1041                        query.name, subscription.target_queue
1042                    ));
1043                }
1044                AlterOperation::DisableEvents => {
1045                    messages.push(format!("events disabled on '{}'", query.name));
1046                }
1047                AlterOperation::AddSubscription { name, descriptor } => {
1048                    let mut sub = descriptor.clone();
1049                    sub.name = name.clone();
1050                    sub.source = query.name.clone();
1051                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052                    ensure_event_target_queue(self, &sub.target_queue)?;
1053                    messages.push(format!(
1054                        "subscription '{}' added on '{}' to '{}'",
1055                        name, query.name, sub.target_queue
1056                    ));
1057                }
1058                AlterOperation::DropSubscription { name } => {
1059                    messages.push(format!(
1060                        "subscription '{}' dropped on '{}'",
1061                        name, query.name
1062                    ));
1063                }
1064                AlterOperation::AddSigner { pubkey } => {
1065                    // Issue #522 — admin-gated registry mutation. The
1066                    // standard DDL `check_write` above gates by role; we
1067                    // additionally verify the collection actually has a
1068                    // signed-writes registry installed so this isn't a
1069                    // covert way to retrofit one (use `CREATE COLLECTION
1070                    // ... SIGNED_BY (...)` for that).
1071                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072                        return Err(RedDBError::Query(format!(
1073                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075                            query.name
1076                        )));
1077                    }
1078                    let actor = crate::runtime::impl_core::current_user_projected()
1079                        .unwrap_or_else(|| "@system/alter".to_string());
1080                    let changed = crate::runtime::signed_writes_kind::add_signer(
1081                        &store,
1082                        &query.name,
1083                        *pubkey,
1084                        &actor,
1085                    );
1086                    messages.push(format!(
1087                        "signer {} on '{}'",
1088                        if changed { "added" } else { "already present" },
1089                        query.name
1090                    ));
1091                }
1092                AlterOperation::RevokeSigner { pubkey } => {
1093                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094                        return Err(RedDBError::Query(format!(
1095                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096                            query.name
1097                        )));
1098                    }
1099                    let actor = crate::runtime::impl_core::current_user_projected()
1100                        .unwrap_or_else(|| "@system/alter".to_string());
1101                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102                        &store,
1103                        &query.name,
1104                        pubkey,
1105                        &actor,
1106                    );
1107                    messages.push(format!(
1108                        "signer {} on '{}'",
1109                        if changed {
1110                            "revoked"
1111                        } else {
1112                            "already revoked"
1113                        },
1114                        query.name
1115                    ));
1116                }
1117                AlterOperation::SetRetention { duration_ms } => {
1118                    // Issue #580 — validate that the collection has a
1119                    // timestamp column the lazy-on-scan filter can key
1120                    // off. Without one there is no anchor for "older
1121                    // than now - duration"; reject at ALTER time so the
1122                    // policy can never silently hide all rows.
1123                    let existing = self.inner.db.collection_contract(&query.name);
1124                    let has_ts_column = existing
1125                        .as_ref()
1126                        .map(retention_timestamp_column_exists)
1127                        .unwrap_or(false);
1128                    if !has_ts_column {
1129                        return Err(RedDBError::Query(format!(
1130                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132                             or enable WITH timestamps = true before setting a \
1133                             retention policy",
1134                            query.name
1135                        )));
1136                    }
1137                    messages.push(format!(
1138                        "retention set to {duration_ms} ms on '{}'",
1139                        query.name
1140                    ));
1141                }
1142                AlterOperation::UnsetRetention => {
1143                    messages.push(format!("retention cleared on '{}'", query.name));
1144                }
1145                AlterOperation::AddAnalytics(views) => {
1146                    // Issue #801 — analytics lifecycle is graph-only. Reject the
1147                    // op on a non-graph collection so a misrouted ALTER fails
1148                    // loudly instead of silently writing analytics_config onto a
1149                    // model whose reader never resolves `<name>.<output>`.
1150                    self.require_graph_for_analytics(&query.name)?;
1151                    let existing = self.inner.db.collection_contract(&query.name);
1152                    let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153                        existing
1154                            .as_ref()
1155                            .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156                            .unwrap_or_default();
1157                    for view in views {
1158                        if enabled.contains(&view.output) {
1159                            // Idempotent: adding an already-enabled output is a
1160                            // no-op — no error, no duplicate state.
1161                            messages.push(format!(
1162                                "analytics '{}' already enabled on '{}'",
1163                                view.output.as_str(),
1164                                query.name
1165                            ));
1166                        } else {
1167                            messages.push(format!(
1168                                "analytics '{}' enabled on '{}'",
1169                                view.output.as_str(),
1170                                query.name
1171                            ));
1172                        }
1173                    }
1174                }
1175                AlterOperation::DropAnalytics(output) => {
1176                    self.require_graph_for_analytics(&query.name)?;
1177                    let enabled = self
1178                        .inner
1179                        .db
1180                        .collection_contract(&query.name)
1181                        .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182                        .unwrap_or(false);
1183                    if !enabled {
1184                        // Dropping an output that was never enabled is a clean,
1185                        // explicit error (AC4) rather than a silent no-op.
1186                        return Err(RedDBError::Query(format!(
1187                            "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188                            output.as_str(),
1189                            query.name
1190                        )));
1191                    }
1192                    messages.push(format!(
1193                        "analytics '{}' disabled on '{}'",
1194                        output.as_str(),
1195                        query.name
1196                    ));
1197                }
1198            }
1199        }
1200
1201        let mut contract = self
1202            .inner
1203            .db
1204            .collection_contract(&query.name)
1205            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206        apply_alter_operations_to_contract(&mut contract, &query.operations);
1207        contract.version = contract.version.saturating_add(1);
1208        contract.updated_at_unix_ms = current_unix_ms();
1209        self.inner
1210            .db
1211            .save_collection_contract(contract)
1212            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213        // Issue #301 — emit OperatorEvent when column schema changes on a
1214        // collection that has active event subscriptions, so operators know
1215        // downstream consumers may see a different payload shape.
1216        if !fields_added.is_empty() || !fields_removed.is_empty() {
1217            let sub_names: Vec<String> = self
1218                .inner
1219                .db
1220                .collection_contract(&query.name)
1221                .map(|c| {
1222                    c.subscriptions
1223                        .iter()
1224                        .filter(|s| s.enabled)
1225                        .map(|s| s.name.clone())
1226                        .collect()
1227                })
1228                .unwrap_or_default();
1229            if !sub_names.is_empty() {
1230                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231                    collection: query.name.clone(),
1232                    subscription_names: sub_names.join(", "),
1233                    fields_added: fields_added.join(", "),
1234                    fields_removed: fields_removed.join(", "),
1235                    lsn: self.cdc_current_lsn(),
1236                }
1237                .emit_global();
1238            }
1239        }
1240
1241        self.clear_table_planner_stats(&query.name);
1242        self.invalidate_result_cache();
1243        // Issue #120 — refresh the schema-vocabulary entries from the
1244        // post-ALTER contract. Drop+recreate inside the index keeps
1245        // the invalidation guarantee complete (no stale columns from
1246        // before an ALTER ... DROP COLUMN).
1247        let post_alter_columns: Vec<String> = self
1248            .inner
1249            .db
1250            .collection_contract(&query.name)
1251            .map(|contract| {
1252                contract
1253                    .declared_columns
1254                    .iter()
1255                    .map(|col| col.name.clone())
1256                    .collect()
1257            })
1258            .unwrap_or_default();
1259        self.schema_vocabulary_apply(
1260            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261                collection: query.name.clone(),
1262                columns: post_alter_columns,
1263                type_tags: Vec::new(),
1264                description: None,
1265            },
1266        );
1267
1268        let message = if messages.is_empty() {
1269            format!("table '{}' altered (no operations)", query.name)
1270        } else {
1271            format!("table '{}' altered: {}", query.name, messages.join(", "))
1272        };
1273
1274        Ok(RuntimeQueryResult::ok_message(
1275            raw_query.to_string(),
1276            &message,
1277            "alter",
1278        ))
1279    }
1280
1281    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1282    ///
1283    /// Pure read: computes the schema diff between the target table's
1284    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1285    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1286    /// JSON. Never mutates storage.
1287    pub fn execute_explain_alter(
1288        &self,
1289        raw_query: &str,
1290        query: &ExplainAlterQuery,
1291    ) -> RedDBResult<RuntimeQueryResult> {
1292        // Validate the target CREATE TABLE body so syntactically valid
1293        // but semantically broken targets (bad SQL types, duplicate
1294        // columns) are caught here rather than inside the diff engine.
1295        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297        let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300            .as_ref()
1301            .map(|c| c.declared_columns.clone())
1302            .unwrap_or_default();
1303
1304        let diff = super::schema_diff::compute_column_diff(
1305            &query.target.name,
1306            &current_columns,
1307            &query.target.columns,
1308        );
1309
1310        let rendered = match query.format {
1311            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313        };
1314
1315        let format_label = match query.format {
1316            ExplainFormat::Sql => "sql",
1317            ExplainFormat::Json => "json",
1318        };
1319
1320        let columns = vec![
1321            "table".to_string(),
1322            "format".to_string(),
1323            "diff".to_string(),
1324        ];
1325        let row = vec![
1326            ("table".to_string(), Value::text(query.target.name.clone())),
1327            ("format".to_string(), Value::text(format_label.to_string())),
1328            ("diff".to_string(), Value::text(rendered)),
1329        ];
1330
1331        Ok(RuntimeQueryResult::ok_records(
1332            raw_query.to_string(),
1333            columns,
1334            vec![row],
1335            "explain",
1336        ))
1337    }
1338
1339    /// Execute CREATE INDEX
1340    ///
1341    /// Registers a new index on a collection, builds it from existing data,
1342    /// and makes it available to the query executor for O(1) lookups.
1343    pub fn execute_create_index(
1344        &self,
1345        raw_query: &str,
1346        query: &CreateIndexQuery,
1347    ) -> RedDBResult<RuntimeQueryResult> {
1348        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349        let store = self.inner.db.store();
1350
1351        // Verify the table exists
1352        let manager = store
1353            .get_collection(&query.table)
1354            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356        let method_kind = match query.method {
1357            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361        };
1362
1363        // Extract fields from existing entities for indexing. Row
1364        // entities may arrive in either the "named" HashMap layout
1365        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1366        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1367        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1368        // leaves `named == None`). Prior to this commit the columnar
1369        // branch returned an empty field list, so `CREATE INDEX` built
1370        // a zero-entity index over HTTP-inserted data even though the
1371        // data was queryable via `SELECT`.
1372        let entities = manager.query_all(|_| true);
1373        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374            entities
1375                .iter()
1376                .map(|e| {
1377                    let fields = match &e.data {
1378                        crate::storage::EntityData::Row(row) => {
1379                            if let Some(ref named) = row.named {
1380                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381                            } else if let Some(ref schema) = row.schema {
1382                                // Columnar layout — pair each column
1383                                // with its positional name from the
1384                                // shared schema Arc.
1385                                schema
1386                                    .iter()
1387                                    .zip(row.columns.iter())
1388                                    .map(|(k, v)| (k.clone(), v.clone()))
1389                                    .collect()
1390                            } else {
1391                                Vec::new()
1392                            }
1393                        }
1394                        crate::storage::EntityData::Node(node) => node
1395                            .properties
1396                            .iter()
1397                            .map(|(k, v)| (k.clone(), v.clone()))
1398                            .collect(),
1399                        _ => Vec::new(),
1400                    };
1401                    (e.id, fields)
1402                })
1403                .collect();
1404
1405        // Build the index
1406        let indexed_count = self
1407            .inner
1408            .index_store
1409            .create_index(
1410                &query.name,
1411                &query.table,
1412                &query.columns,
1413                method_kind,
1414                query.unique,
1415                &entity_fields,
1416            )
1417            .map_err(RedDBError::Internal)?;
1418
1419        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420            &query.table,
1421            &entity_fields,
1422        );
1423        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424        self.invalidate_plan_cache();
1425
1426        // Register metadata
1427        self.inner
1428            .index_store
1429            .register(super::index_store::RegisteredIndex {
1430                name: query.name.clone(),
1431                collection: query.table.clone(),
1432                columns: query.columns.clone(),
1433                method: method_kind,
1434                unique: query.unique,
1435            });
1436        // Issue #120 — surface the index name + indexed columns in
1437        // the schema-vocabulary so AskPipeline (#121) can resolve
1438        // "the email index" back to its collection.
1439        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1440            collection: query.table.clone(),
1441            index: query.name.clone(),
1442            columns: query.columns.clone(),
1443        });
1444
1445        let method_str = format!("{}", query.method);
1446        let unique_str = if query.unique { "unique " } else { "" };
1447        let cols = query.columns.join(", ");
1448
1449        Ok(RuntimeQueryResult::ok_message(
1450            raw_query.to_string(),
1451            &format!(
1452                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1453                unique_str, query.name, query.table, cols, method_str, indexed_count
1454            ),
1455            "create",
1456        ))
1457    }
1458
1459    /// Execute DROP INDEX
1460    ///
1461    /// Removes an index from a collection.
1462    pub fn execute_drop_index(
1463        &self,
1464        raw_query: &str,
1465        query: &DropIndexQuery,
1466    ) -> RedDBResult<RuntimeQueryResult> {
1467        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1468        let store = self.inner.db.store();
1469
1470        // Verify the table exists
1471        if store.get_collection(&query.table).is_none() {
1472            if query.if_exists {
1473                return Ok(RuntimeQueryResult::ok_message(
1474                    raw_query.to_string(),
1475                    &format!("table '{}' does not exist", query.table),
1476                    "drop",
1477                ));
1478            }
1479            return Err(RedDBError::NotFound(format!(
1480                "table '{}' not found",
1481                query.table
1482            )));
1483        }
1484
1485        // Remove from IndexStore
1486        self.inner.index_store.drop_index(&query.name, &query.table);
1487        self.invalidate_plan_cache();
1488        // Issue #120 — keep the schema-vocabulary index entry in sync.
1489        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1490            collection: query.table.clone(),
1491            index: query.name.clone(),
1492        });
1493
1494        Ok(RuntimeQueryResult::ok_message(
1495            raw_query.to_string(),
1496            &format!("index '{}' dropped from '{}'", query.name, query.table),
1497            "drop",
1498        ))
1499    }
1500
1501    fn execute_drop_typed_collection(
1502        &self,
1503        raw_query: &str,
1504        name: &str,
1505        if_exists: bool,
1506        expected_model: CollectionModel,
1507        label: &str,
1508    ) -> RedDBResult<RuntimeQueryResult> {
1509        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1510        if is_system_schema_name(name) {
1511            return Err(RedDBError::Query("system schema is read-only".to_string()));
1512        }
1513        let store = self.inner.db.store();
1514        if store.get_collection(name).is_none() {
1515            if if_exists {
1516                return Ok(RuntimeQueryResult::ok_message(
1517                    raw_query.to_string(),
1518                    &format!("{label} '{name}' does not exist"),
1519                    "drop",
1520                ));
1521            }
1522            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1523        }
1524
1525        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1526        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1527        self.drop_collection_storage(raw_query, name, label)
1528    }
1529
1530    pub fn execute_truncate(
1531        &self,
1532        raw_query: &str,
1533        query: &TruncateQuery,
1534    ) -> RedDBResult<RuntimeQueryResult> {
1535        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1536        if is_system_schema_name(&query.name) {
1537            return Err(RedDBError::Query("system schema is read-only".to_string()));
1538        }
1539
1540        let label = query
1541            .model
1542            .map(polymorphic_resolver::model_name)
1543            .unwrap_or("collection");
1544        let store = self.inner.db.store();
1545        if store.get_collection(&query.name).is_none() {
1546            if query.if_exists {
1547                return Ok(RuntimeQueryResult::ok_message(
1548                    raw_query.to_string(),
1549                    &format!("{label} '{}' does not exist", query.name),
1550                    "truncate",
1551                ));
1552            }
1553            return Err(RedDBError::NotFound(format!(
1554                "{label} '{}' not found",
1555                query.name
1556            )));
1557        }
1558
1559        let actual =
1560            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1561        if let Some(expected) = query.model {
1562            polymorphic_resolver::ensure_model_match(expected, actual)?;
1563        }
1564
1565        if actual == CollectionModel::Queue {
1566            return self.execute_queue_command(
1567                raw_query,
1568                &QueueCommand::Purge {
1569                    queue: query.name.clone(),
1570                },
1571            );
1572        }
1573
1574        // Count before wiping so we can emit the aggregated truncate event.
1575        let affected = self.truncate_collection_entities(&query.name)?;
1576        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1577        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1578        self.inner.db.invalidate_vector_index(&query.name);
1579        self.clear_table_planner_stats(&query.name);
1580        self.invalidate_result_cache();
1581
1582        Ok(RuntimeQueryResult::ok_message(
1583            raw_query.to_string(),
1584            &format!(
1585                "{affected} entities truncated from {label} '{}'",
1586                query.name
1587            ),
1588            "truncate",
1589        ))
1590    }
1591
1592    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1593        let store = self.inner.db.store();
1594        let Some(manager) = store.get_collection(name) else {
1595            return Ok(0);
1596        };
1597        let entities = manager.query_all(|_| true);
1598        if entities.is_empty() {
1599            return Ok(0);
1600        }
1601
1602        for entity in &entities {
1603            let fields = entity_index_fields(&entity.data);
1604            self.inner
1605                .index_store
1606                .index_entity_delete(name, entity.id, &fields)
1607                .map_err(RedDBError::Internal)?;
1608        }
1609
1610        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1611        let deleted_ids = store
1612            .delete_batch(name, &ids)
1613            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1614        for id in &deleted_ids {
1615            store.context_index().remove_entity(*id);
1616        }
1617        Ok(deleted_ids.len() as u64)
1618    }
1619
1620    fn drop_collection_storage(
1621        &self,
1622        raw_query: &str,
1623        name: &str,
1624        label: &str,
1625    ) -> RedDBResult<RuntimeQueryResult> {
1626        let store = self.inner.db.store();
1627
1628        // Emit 1 collection_dropped event before storage is wiped.
1629        // Queue is preserved; subscription is removed with the contract below.
1630        let final_count = store
1631            .get_collection(name)
1632            .map(|manager| manager.query_all(|_| true).len() as u64)
1633            .unwrap_or(0);
1634        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1635            self,
1636            name,
1637            final_count,
1638        )?;
1639
1640        let orphaned_indices: Vec<String> = self
1641            .inner
1642            .index_store
1643            .list_indices(name)
1644            .into_iter()
1645            .map(|index| index.name)
1646            .collect();
1647        for index_name in &orphaned_indices {
1648            self.inner.index_store.drop_index(index_name, name);
1649        }
1650
1651        store
1652            .drop_collection(name)
1653            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1654        self.inner.db.invalidate_vector_index(name);
1655        self.inner.db.clear_collection_default_ttl_ms(name);
1656        self.inner
1657            .db
1658            .remove_collection_contract(name)
1659            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1660        self.clear_table_planner_stats(name);
1661        self.invalidate_result_cache();
1662        if let Some(store) = self.inner.auth_store.read().clone() {
1663            store.invalidate_visible_collections_cache();
1664        }
1665        self.inner
1666            .db
1667            .persist_metadata()
1668            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1669        self.schema_vocabulary_apply(
1670            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1671                collection: name.to_string(),
1672            },
1673        );
1674
1675        Ok(RuntimeQueryResult::ok_message(
1676            raw_query.to_string(),
1677            &format!("{label} '{name}' dropped"),
1678            "drop",
1679        ))
1680    }
1681}
1682
1683pub(crate) fn is_system_schema_name(name: &str) -> bool {
1684    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1685}
1686
1687fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1688    match data {
1689        EntityData::Row(row) => {
1690            if let Some(ref named) = row.named {
1691                named
1692                    .iter()
1693                    .map(|(key, value)| (key.clone(), value.clone()))
1694                    .collect()
1695            } else if let Some(ref schema) = row.schema {
1696                schema
1697                    .iter()
1698                    .zip(row.columns.iter())
1699                    .map(|(key, value)| (key.clone(), value.clone()))
1700                    .collect()
1701            } else {
1702                Vec::new()
1703            }
1704        }
1705        EntityData::Node(node) => node
1706            .properties
1707            .iter()
1708            .map(|(key, value)| (key.clone(), value.clone()))
1709            .collect(),
1710        _ => Vec::new(),
1711    }
1712}
1713
1714fn collection_contract_from_create_table(
1715    query: &CreateTableQuery,
1716) -> RedDBResult<crate::physical::CollectionContract> {
1717    let now = current_unix_ms();
1718    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1719        .columns
1720        .iter()
1721        .map(declared_column_contract_from_ddl)
1722        .collect();
1723    if query.timestamps {
1724        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1725        // columns that the write path populates from
1726        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1727        declared_columns.push(crate::physical::DeclaredColumnContract {
1728            name: "created_at".to_string(),
1729            data_type: "BIGINT".to_string(),
1730            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1731            not_null: true,
1732            default: None,
1733            compress: None,
1734            unique: false,
1735            primary_key: false,
1736            enum_variants: Vec::new(),
1737            array_element: None,
1738            decimal_precision: None,
1739        });
1740        declared_columns.push(crate::physical::DeclaredColumnContract {
1741            name: "updated_at".to_string(),
1742            data_type: "BIGINT".to_string(),
1743            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1744            not_null: true,
1745            default: None,
1746            compress: None,
1747            unique: false,
1748            primary_key: false,
1749            enum_variants: Vec::new(),
1750            array_element: None,
1751            decimal_precision: None,
1752        });
1753    }
1754    Ok(crate::physical::CollectionContract {
1755        name: query.name.clone(),
1756        declared_model: crate::catalog::CollectionModel::Table,
1757        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1758        origin: crate::physical::ContractOrigin::Explicit,
1759        version: 1,
1760        created_at_unix_ms: now,
1761        updated_at_unix_ms: now,
1762        default_ttl_ms: query.default_ttl_ms,
1763        vector_dimension: None,
1764        vector_metric: None,
1765        context_index_fields: query.context_index_fields.clone(),
1766        declared_columns,
1767        table_def: Some(build_table_def_from_create_table(query)?),
1768        timestamps_enabled: query.timestamps,
1769        context_index_enabled: query.context_index_enabled
1770            || !query.context_index_fields.is_empty(),
1771        metrics_raw_retention_ms: None,
1772        metrics_rollup_policies: Vec::new(),
1773        metrics_tenant_identity: None,
1774        metrics_namespace: None,
1775        append_only: query.append_only,
1776        subscriptions: query.subscriptions.clone(),
1777        analytics_config: Vec::new(),
1778        session_key: None,
1779        session_gap_ms: None,
1780        retention_duration_ms: None,
1781        analytical_storage: None,
1782    })
1783}
1784
1785fn default_collection_contract_for_existing_table(
1786    name: &str,
1787) -> crate::physical::CollectionContract {
1788    let now = current_unix_ms();
1789    crate::physical::CollectionContract {
1790        name: name.to_string(),
1791        declared_model: crate::catalog::CollectionModel::Table,
1792        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1793        origin: crate::physical::ContractOrigin::Explicit,
1794        version: 0,
1795        created_at_unix_ms: now,
1796        updated_at_unix_ms: now,
1797        default_ttl_ms: None,
1798        vector_dimension: None,
1799        vector_metric: None,
1800        context_index_fields: Vec::new(),
1801        declared_columns: Vec::new(),
1802        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1803        timestamps_enabled: false,
1804        context_index_enabled: false,
1805        metrics_raw_retention_ms: None,
1806        metrics_rollup_policies: Vec::new(),
1807        metrics_tenant_identity: None,
1808        metrics_namespace: None,
1809        append_only: false,
1810        subscriptions: Vec::new(),
1811        analytics_config: Vec::new(),
1812        session_key: None,
1813        session_gap_ms: None,
1814        retention_duration_ms: None,
1815        analytical_storage: None,
1816    }
1817}
1818
1819fn keyed_collection_contract(
1820    name: &str,
1821    model: crate::catalog::CollectionModel,
1822    analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1823) -> crate::physical::CollectionContract {
1824    let now = current_unix_ms();
1825    crate::physical::CollectionContract {
1826        name: name.to_string(),
1827        declared_model: model,
1828        schema_mode: crate::catalog::SchemaMode::Dynamic,
1829        origin: crate::physical::ContractOrigin::Explicit,
1830        version: 1,
1831        created_at_unix_ms: now,
1832        updated_at_unix_ms: now,
1833        default_ttl_ms: None,
1834        vector_dimension: None,
1835        vector_metric: None,
1836        context_index_fields: Vec::new(),
1837        declared_columns: Vec::new(),
1838        table_def: None,
1839        timestamps_enabled: false,
1840        context_index_enabled: false,
1841        metrics_raw_retention_ms: None,
1842        metrics_rollup_policies: Vec::new(),
1843        metrics_tenant_identity: None,
1844        metrics_namespace: None,
1845        append_only: false,
1846        subscriptions: Vec::new(),
1847        analytics_config,
1848        session_key: None,
1849        session_gap_ms: None,
1850        retention_duration_ms: None,
1851        analytical_storage: None,
1852    }
1853}
1854
1855fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1856    let now = current_unix_ms();
1857    crate::physical::CollectionContract {
1858        name: query.name.clone(),
1859        declared_model: crate::catalog::CollectionModel::Metrics,
1860        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1861        origin: crate::physical::ContractOrigin::Explicit,
1862        version: 1,
1863        created_at_unix_ms: now,
1864        updated_at_unix_ms: now,
1865        default_ttl_ms: query.default_ttl_ms,
1866        vector_dimension: None,
1867        vector_metric: None,
1868        context_index_fields: Vec::new(),
1869        declared_columns: Vec::new(),
1870        table_def: None,
1871        timestamps_enabled: false,
1872        context_index_enabled: false,
1873        metrics_raw_retention_ms: query.default_ttl_ms,
1874        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1875        metrics_tenant_identity: Some(
1876            query
1877                .tenant_by
1878                .clone()
1879                .unwrap_or_else(|| "current_tenant".to_string()),
1880        ),
1881        metrics_namespace: Some("default".to_string()),
1882        append_only: true,
1883        subscriptions: Vec::new(),
1884        analytics_config: Vec::new(),
1885        session_key: None,
1886        session_gap_ms: None,
1887        retention_duration_ms: None,
1888        analytical_storage: None,
1889    }
1890}
1891
1892fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1893    let now = current_unix_ms();
1894    crate::physical::CollectionContract {
1895        name: query.name.clone(),
1896        declared_model: crate::catalog::CollectionModel::Vector,
1897        schema_mode: crate::catalog::SchemaMode::Dynamic,
1898        origin: crate::physical::ContractOrigin::Explicit,
1899        version: 1,
1900        created_at_unix_ms: now,
1901        updated_at_unix_ms: now,
1902        default_ttl_ms: None,
1903        vector_dimension: Some(query.dimension),
1904        vector_metric: Some(query.metric),
1905        context_index_fields: Vec::new(),
1906        declared_columns: Vec::new(),
1907        table_def: None,
1908        timestamps_enabled: false,
1909        context_index_enabled: false,
1910        metrics_raw_retention_ms: None,
1911        metrics_rollup_policies: Vec::new(),
1912        metrics_tenant_identity: None,
1913        metrics_namespace: None,
1914        append_only: false,
1915        subscriptions: Vec::new(),
1916        analytics_config: Vec::new(),
1917        session_key: None,
1918        session_gap_ms: None,
1919        retention_duration_ms: None,
1920        analytical_storage: None,
1921    }
1922}
1923
1924fn declared_column_contract_from_ddl(
1925    column: &CreateColumnDef,
1926) -> crate::physical::DeclaredColumnContract {
1927    crate::physical::DeclaredColumnContract {
1928        name: column.name.clone(),
1929        data_type: column.data_type.clone(),
1930        sql_type: Some(column.sql_type.clone()),
1931        not_null: column.not_null,
1932        default: column.default.clone(),
1933        compress: column.compress,
1934        unique: column.unique,
1935        primary_key: column.primary_key,
1936        enum_variants: column.enum_variants.clone(),
1937        array_element: column.array_element.clone(),
1938        decimal_precision: column.decimal_precision,
1939    }
1940}
1941
1942fn apply_alter_operations_to_contract(
1943    contract: &mut crate::physical::CollectionContract,
1944    operations: &[AlterOperation],
1945) {
1946    if contract.table_def.is_none() {
1947        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1948    }
1949    for operation in operations {
1950        match operation {
1951            AlterOperation::AddColumn(column) => {
1952                if !contract
1953                    .declared_columns
1954                    .iter()
1955                    .any(|existing| existing.name == column.name)
1956                {
1957                    contract
1958                        .declared_columns
1959                        .push(declared_column_contract_from_ddl(column));
1960                }
1961                if let Some(table_def) = contract.table_def.as_mut() {
1962                    if table_def.get_column(&column.name).is_none() {
1963                        if let Ok(column_def) = column_def_from_ddl(column) {
1964                            if column.primary_key {
1965                                table_def.primary_key.push(column.name.clone());
1966                                table_def.constraints.push(
1967                                    crate::storage::schema::Constraint::new(
1968                                        format!("pk_{}", column.name),
1969                                        crate::storage::schema::ConstraintType::PrimaryKey,
1970                                    )
1971                                    .on_columns(vec![column.name.clone()]),
1972                                );
1973                            }
1974                            if column.unique {
1975                                table_def.constraints.push(
1976                                    crate::storage::schema::Constraint::new(
1977                                        format!("uniq_{}", column.name),
1978                                        crate::storage::schema::ConstraintType::Unique,
1979                                    )
1980                                    .on_columns(vec![column.name.clone()]),
1981                                );
1982                            }
1983                            if column.not_null {
1984                                table_def.constraints.push(
1985                                    crate::storage::schema::Constraint::new(
1986                                        format!("not_null_{}", column.name),
1987                                        crate::storage::schema::ConstraintType::NotNull,
1988                                    )
1989                                    .on_columns(vec![column.name.clone()]),
1990                                );
1991                            }
1992                            table_def.columns.push(column_def);
1993                        }
1994                    }
1995                }
1996            }
1997            AlterOperation::DropColumn(name) => {
1998                contract
1999                    .declared_columns
2000                    .retain(|column| column.name != *name);
2001                if let Some(table_def) = contract.table_def.as_mut() {
2002                    if let Some(index) = table_def.column_index(name) {
2003                        table_def.columns.remove(index);
2004                    }
2005                    table_def.primary_key.retain(|column| column != name);
2006                    table_def.constraints.retain(|constraint| {
2007                        !constraint.columns.iter().any(|column| column == name)
2008                    });
2009                    table_def
2010                        .indexes
2011                        .retain(|index| !index.columns.iter().any(|column| column == name));
2012                }
2013            }
2014            AlterOperation::RenameColumn { from, to } => {
2015                if contract
2016                    .declared_columns
2017                    .iter()
2018                    .any(|column| column.name == *to)
2019                {
2020                    continue;
2021                }
2022                if let Some(column) = contract
2023                    .declared_columns
2024                    .iter_mut()
2025                    .find(|column| column.name == *from)
2026                {
2027                    column.name = to.clone();
2028                }
2029                if let Some(table_def) = contract.table_def.as_mut() {
2030                    if let Some(column) = table_def
2031                        .columns
2032                        .iter_mut()
2033                        .find(|column| column.name == *from)
2034                    {
2035                        column.name = to.clone();
2036                    }
2037                    for primary_key in &mut table_def.primary_key {
2038                        if *primary_key == *from {
2039                            *primary_key = to.clone();
2040                        }
2041                    }
2042                    for constraint in &mut table_def.constraints {
2043                        for column in &mut constraint.columns {
2044                            if *column == *from {
2045                                *column = to.clone();
2046                            }
2047                        }
2048                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2049                            for column in ref_columns {
2050                                if *column == *from {
2051                                    *column = to.clone();
2052                                }
2053                            }
2054                        }
2055                    }
2056                    for index in &mut table_def.indexes {
2057                        for column in &mut index.columns {
2058                            if *column == *from {
2059                                *column = to.clone();
2060                            }
2061                        }
2062                    }
2063                }
2064            }
2065            // Partition ops don't touch the column contract — metadata is
2066            // persisted separately via `red_config.partition.*`.
2067            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2068            // RLS toggles don't touch the column contract — flag is persisted
2069            // separately via `red_config.rls.enabled.{table}` and enforced
2070            // through the in-memory `rls_enabled_tables` set.
2071            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2072            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
2073            // and are enforced through `tenant_tables` + RLS auto-policy.
2074            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2075            AlterOperation::SetAppendOnly(on) => {
2076                contract.append_only = *on;
2077            }
2078            // VCS opt-in is persisted to red_vcs_settings by the
2079            // executor, not the contract — nothing to do here.
2080            AlterOperation::SetVersioned(_) => {}
2081            AlterOperation::EnableEvents(subscription) => {
2082                let mut subscription = subscription.clone();
2083                subscription.source = contract.name.clone();
2084                subscription.enabled = true;
2085                if let Some(existing) = contract
2086                    .subscriptions
2087                    .iter_mut()
2088                    .find(|existing| existing.target_queue == subscription.target_queue)
2089                {
2090                    *existing = subscription;
2091                } else {
2092                    contract.subscriptions.push(subscription);
2093                }
2094            }
2095            AlterOperation::DisableEvents => {
2096                for subscription in &mut contract.subscriptions {
2097                    subscription.enabled = false;
2098                }
2099            }
2100            AlterOperation::AddSubscription { name, descriptor } => {
2101                let mut sub = descriptor.clone();
2102                sub.name = name.clone();
2103                sub.source = contract.name.clone();
2104                sub.enabled = true;
2105                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2106                {
2107                    *existing = sub;
2108                } else {
2109                    contract.subscriptions.push(sub);
2110                }
2111            }
2112            AlterOperation::DropSubscription { name } => {
2113                contract.subscriptions.retain(|s| s.name != *name);
2114            }
2115            // Signer registry mutations live in `red_config` outside the
2116            // contract surface — the executor applied them directly via
2117            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
2118            // into the column-shaped contract.
2119            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2120            AlterOperation::SetRetention { duration_ms } => {
2121                contract.retention_duration_ms = Some(*duration_ms);
2122            }
2123            AlterOperation::UnsetRetention => {
2124                contract.retention_duration_ms = None;
2125            }
2126            // Issue #801 — fold analytics lifecycle into the WAL-backed
2127            // `analytics_config` so the change is durable and the
2128            // `<graph>.<output>` resolver picks it up on the next read.
2129            AlterOperation::AddAnalytics(views) => {
2130                for view in views {
2131                    // Idempotent: skip outputs already enabled so a repeated
2132                    // ADD never duplicates state. The first declaration wins —
2133                    // re-declaring options requires DROP then ADD.
2134                    if !contract
2135                        .analytics_config
2136                        .iter()
2137                        .any(|existing| existing.output == view.output)
2138                    {
2139                        contract.analytics_config.push(view.clone());
2140                    }
2141                }
2142            }
2143            AlterOperation::DropAnalytics(output) => {
2144                contract
2145                    .analytics_config
2146                    .retain(|view| view.output != *output);
2147            }
2148        }
2149    }
2150}
2151
2152/// Issue #580 — returns true if the contract carries at least one
2153/// column the retention filter can use as a timestamp anchor:
2154/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
2155/// or a user-declared column with a temporal data_type.
2156pub(crate) fn retention_timestamp_column_exists(
2157    contract: &crate::physical::CollectionContract,
2158) -> bool {
2159    if contract.timestamps_enabled {
2160        return true;
2161    }
2162    if matches!(
2163        contract.declared_model,
2164        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2165    ) {
2166        // Time-series and metrics collections carry an intrinsic
2167        // timestamp axis on every row even without a declared column;
2168        // retention has a natural anchor.
2169        return true;
2170    }
2171    contract
2172        .declared_columns
2173        .iter()
2174        .any(|column| is_temporal_data_type(&column.data_type))
2175}
2176
2177fn is_temporal_data_type(data_type: &str) -> bool {
2178    let upper = data_type.to_ascii_uppercase();
2179    matches!(
2180        upper.as_str(),
2181        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2182    )
2183}
2184
2185fn validate_event_subscriptions(
2186    runtime: &RedDBRuntime,
2187    source: &str,
2188    subscriptions: &[crate::catalog::SubscriptionDescriptor],
2189) -> RedDBResult<()> {
2190    for subscription in subscriptions
2191        .iter()
2192        .filter(|subscription| subscription.enabled)
2193    {
2194        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2195            return Err(RedDBError::Query(
2196                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2197            ));
2198        }
2199        validate_subscription_auth(runtime, source, subscription)?;
2200        if subscription.target_queue == source
2201            || subscription_would_create_cycle(
2202                &runtime.inner.db,
2203                source,
2204                &subscription.target_queue,
2205            )
2206        {
2207            return Err(RedDBError::Query(
2208                "subscription would create cycle".to_string(),
2209            ));
2210        }
2211        audit_subscription_redact_gap(runtime, source, subscription);
2212    }
2213    Ok(())
2214}
2215
2216fn validate_subscription_auth(
2217    runtime: &RedDBRuntime,
2218    source: &str,
2219    subscription: &crate::catalog::SubscriptionDescriptor,
2220) -> RedDBResult<()> {
2221    let auth_store = match runtime.inner.auth_store.read().clone() {
2222        Some(store) => store,
2223        None => return Ok(()),
2224    };
2225    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2226        Some(identity) => identity,
2227        None => return Ok(()),
2228    };
2229    let tenant = crate::runtime::impl_core::current_tenant();
2230    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2231
2232    if auth_store.iam_authorization_enabled() {
2233        let ctx = crate::auth::policies::EvalContext {
2234            principal_tenant: tenant.clone(),
2235            current_tenant: tenant.clone(),
2236            peer_ip: None,
2237            mfa_present: false,
2238            now_ms: crate::auth::now_ms(),
2239            principal_is_admin_role: role == crate::auth::Role::Admin,
2240            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2241            principal_is_platform_scoped: principal.tenant.is_none(),
2242        };
2243        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2244        if let Some(t) = tenant.as_deref() {
2245            source_resource = source_resource.with_tenant(t.to_string());
2246        }
2247        if !auth_store.check_policy_authz_with_role(
2248            &principal,
2249            "select",
2250            &source_resource,
2251            &ctx,
2252            role,
2253        ) {
2254            return Err(RedDBError::Query(format!(
2255                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2256                principal, source_resource.kind, source_resource.name
2257            )));
2258        }
2259
2260        let mut target_resource =
2261            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2262        if let Some(t) = tenant.as_deref() {
2263            target_resource = target_resource.with_tenant(t.to_string());
2264        }
2265        if !auth_store.check_policy_authz_with_role(
2266            &principal,
2267            "write",
2268            &target_resource,
2269            &ctx,
2270            role,
2271        ) {
2272            return Err(RedDBError::Query(format!(
2273                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2274                principal, target_resource.kind, target_resource.name
2275            )));
2276        }
2277        return Ok(());
2278    }
2279
2280    let ctx = crate::auth::privileges::AuthzContext {
2281        principal: &username,
2282        effective_role: role,
2283        tenant: tenant.as_deref(),
2284    };
2285    auth_store
2286        .check_grant(
2287            &ctx,
2288            crate::auth::privileges::Action::Select,
2289            &crate::auth::privileges::Resource::table_from_name(source),
2290        )
2291        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2292    auth_store
2293        .check_grant(
2294            &ctx,
2295            crate::auth::privileges::Action::Insert,
2296            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2297        )
2298        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2299    Ok(())
2300}
2301
2302fn audit_subscription_redact_gap(
2303    runtime: &RedDBRuntime,
2304    source: &str,
2305    subscription: &crate::catalog::SubscriptionDescriptor,
2306) {
2307    let auth_store = match runtime.inner.auth_store.read().clone() {
2308        Some(store) if store.iam_authorization_enabled() => store,
2309        _ => return,
2310    };
2311    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2312        Some(identity) => identity,
2313        None => return,
2314    };
2315    let tenant = crate::runtime::impl_core::current_tenant();
2316    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2317    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2318    if missing.is_empty() {
2319        return;
2320    }
2321
2322    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2323    tracing::warn!(
2324        target: "reddb::operator",
2325        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2326        source,
2327        subscription.target_queue,
2328        columns
2329    );
2330    let mut event = AuditEvent::builder("subscription_redact_gap")
2331        .principal(username)
2332        .source(AuditAuthSource::System)
2333        .resource(format!(
2334            "subscription:{}->{}",
2335            source, subscription.target_queue
2336        ))
2337        .outcome(Outcome::Success)
2338        .field(AuditFieldEscaper::field("source", source))
2339        .field(AuditFieldEscaper::field(
2340            "target_queue",
2341            subscription.target_queue.clone(),
2342        ))
2343        .field(AuditFieldEscaper::field(
2344            "subscription",
2345            subscription.name.clone(),
2346        ))
2347        .field(AuditFieldEscaper::field("columns", columns))
2348        .field(AuditFieldEscaper::field("role", role.as_str()));
2349    if let Some(t) = tenant {
2350        event = event.tenant(t);
2351    }
2352    runtime.inner.audit_log.record_event(event.build());
2353}
2354
2355fn subscription_redact_gap_columns(
2356    auth_store: &crate::auth::store::AuthStore,
2357    principal: &crate::auth::UserId,
2358    source: &str,
2359    subscription: &crate::catalog::SubscriptionDescriptor,
2360) -> BTreeSet<String> {
2361    let redacted: HashSet<String> = subscription
2362        .redact_fields
2363        .iter()
2364        .map(|field| field.to_ascii_lowercase())
2365        .collect();
2366    auth_store
2367        .effective_policies(principal)
2368        .iter()
2369        .flat_map(|policy| policy.statements.iter())
2370        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2371        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2372        .flat_map(|statement| statement.resources.iter())
2373        .filter_map(|resource| denied_column_for_source(resource, source))
2374        .filter(|column| !redact_covers_column(&redacted, source, column))
2375        .collect()
2376}
2377
2378fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2379    match pattern {
2380        crate::auth::policies::ActionPattern::Wildcard => true,
2381        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2382        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2383            "select".len() > prefix.len() + 1
2384                && "select".starts_with(prefix)
2385                && "select".as_bytes()[prefix.len()] == b':'
2386        }
2387    }
2388}
2389
2390fn denied_column_for_source(
2391    resource: &crate::auth::policies::ResourcePattern,
2392    source: &str,
2393) -> Option<String> {
2394    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2395        return None;
2396    };
2397    if kind != "column" {
2398        return None;
2399    }
2400    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2401    (column.table_resource_name() == source).then_some(column.column)
2402}
2403
2404fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2405    let column = column.to_ascii_lowercase();
2406    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2407    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2408}
2409
2410fn subscription_would_create_cycle(
2411    db: &crate::storage::unified::devx::RedDB,
2412    source: &str,
2413    target: &str,
2414) -> bool {
2415    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2416    for contract in db.collection_contracts() {
2417        for subscription in contract
2418            .subscriptions
2419            .into_iter()
2420            .filter(|subscription| subscription.enabled)
2421        {
2422            graph
2423                .entry(subscription.source)
2424                .or_default()
2425                .push(subscription.target_queue);
2426        }
2427    }
2428    graph
2429        .entry(source.to_string())
2430        .or_default()
2431        .push(target.to_string());
2432
2433    let mut stack = vec![target.to_string()];
2434    let mut seen = HashSet::new();
2435    while let Some(node) = stack.pop() {
2436        if node == source {
2437            return true;
2438        }
2439        if !seen.insert(node.clone()) {
2440            continue;
2441        }
2442        if let Some(next) = graph.get(&node) {
2443            stack.extend(next.iter().cloned());
2444        }
2445    }
2446    false
2447}
2448
2449pub(crate) fn ensure_event_target_queue_pub(
2450    runtime: &RedDBRuntime,
2451    queue: &str,
2452) -> RedDBResult<()> {
2453    ensure_event_target_queue(runtime, queue)
2454}
2455
2456fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2457    let store = runtime.inner.db.store();
2458    if store.get_collection(queue).is_some() {
2459        return Ok(());
2460    }
2461    store
2462        .create_collection(queue)
2463        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2464    runtime
2465        .inner
2466        .db
2467        .save_collection_contract(event_queue_collection_contract(queue))
2468        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2469    store.set_config_tree(
2470        &format!("queue.{queue}.mode"),
2471        &crate::serde_json::Value::String("fanout".to_string()),
2472    );
2473    Ok(())
2474}
2475
2476fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2477    let now = current_unix_ms();
2478    crate::physical::CollectionContract {
2479        name: queue.to_string(),
2480        declared_model: crate::catalog::CollectionModel::Queue,
2481        schema_mode: crate::catalog::SchemaMode::Dynamic,
2482        origin: crate::physical::ContractOrigin::Implicit,
2483        version: 1,
2484        created_at_unix_ms: now,
2485        updated_at_unix_ms: now,
2486        default_ttl_ms: None,
2487        vector_dimension: None,
2488        vector_metric: None,
2489        context_index_fields: Vec::new(),
2490        declared_columns: Vec::new(),
2491        table_def: None,
2492        timestamps_enabled: false,
2493        context_index_enabled: false,
2494        metrics_raw_retention_ms: None,
2495        metrics_rollup_policies: Vec::new(),
2496        metrics_tenant_identity: None,
2497        metrics_namespace: None,
2498        append_only: true,
2499        subscriptions: Vec::new(),
2500        analytics_config: Vec::new(),
2501        session_key: None,
2502        session_gap_ms: None,
2503        retention_duration_ms: None,
2504        analytical_storage: None,
2505    }
2506}
2507
2508fn build_table_def_from_create_table(
2509    query: &CreateTableQuery,
2510) -> RedDBResult<crate::storage::schema::TableDef> {
2511    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2512    for column in &query.columns {
2513        if column.primary_key {
2514            table.primary_key.push(column.name.clone());
2515            table.constraints.push(
2516                crate::storage::schema::Constraint::new(
2517                    format!("pk_{}", column.name),
2518                    crate::storage::schema::ConstraintType::PrimaryKey,
2519                )
2520                .on_columns(vec![column.name.clone()]),
2521            );
2522        }
2523        if column.unique {
2524            table.constraints.push(
2525                crate::storage::schema::Constraint::new(
2526                    format!("uniq_{}", column.name),
2527                    crate::storage::schema::ConstraintType::Unique,
2528                )
2529                .on_columns(vec![column.name.clone()]),
2530            );
2531        }
2532        if column.not_null {
2533            table.constraints.push(
2534                crate::storage::schema::Constraint::new(
2535                    format!("not_null_{}", column.name),
2536                    crate::storage::schema::ConstraintType::NotNull,
2537                )
2538                .on_columns(vec![column.name.clone()]),
2539            );
2540        }
2541        table.columns.push(column_def_from_ddl(column)?);
2542    }
2543    // WITH timestamps = true: append the two runtime-managed columns
2544    // to the schema so resolved_contract_columns exposes them to the
2545    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2546    // not-nullable; the write path auto-fills them.
2547    if query.timestamps {
2548        table.columns.push(
2549            crate::storage::schema::ColumnDef::new(
2550                "created_at".to_string(),
2551                crate::storage::schema::DataType::UnsignedInteger,
2552            )
2553            .not_null(),
2554        );
2555        table.columns.push(
2556            crate::storage::schema::ColumnDef::new(
2557                "updated_at".to_string(),
2558                crate::storage::schema::DataType::UnsignedInteger,
2559            )
2560            .not_null(),
2561        );
2562        table.constraints.push(
2563            crate::storage::schema::Constraint::new(
2564                "not_null_created_at".to_string(),
2565                crate::storage::schema::ConstraintType::NotNull,
2566            )
2567            .on_columns(vec!["created_at".to_string()]),
2568        );
2569        table.constraints.push(
2570            crate::storage::schema::Constraint::new(
2571                "not_null_updated_at".to_string(),
2572                crate::storage::schema::ConstraintType::NotNull,
2573            )
2574            .on_columns(vec!["updated_at".to_string()]),
2575        );
2576    }
2577    table
2578        .validate()
2579        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2580    Ok(table)
2581}
2582
2583fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2584    let data_type = resolve_declared_data_type(&column.data_type)
2585        .map_err(|err| RedDBError::Query(err.to_string()))?;
2586    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2587    if column.not_null {
2588        column_def = column_def.not_null();
2589    }
2590    if let Some(default) = &column.default {
2591        column_def = column_def.with_default(default.as_bytes().to_vec());
2592    }
2593    if column.compress.unwrap_or(0) > 0 {
2594        column_def = column_def.compressed();
2595    }
2596    if !column.enum_variants.is_empty() {
2597        column_def = column_def.with_variants(column.enum_variants.clone());
2598    }
2599    if let Some(precision) = column.decimal_precision {
2600        column_def = column_def.with_precision(precision);
2601    }
2602    if let Some(element_type) = &column.array_element {
2603        column_def = column_def.with_element_type(
2604            resolve_declared_data_type(element_type)
2605                .map_err(|err| RedDBError::Query(err.to_string()))?,
2606        );
2607    }
2608    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2609    if column.unique {
2610        column_def = column_def.with_metadata("unique", "true");
2611    }
2612    if column.primary_key {
2613        column_def = column_def.with_metadata("primary_key", "true");
2614    }
2615    Ok(column_def)
2616}
2617
2618fn current_unix_ms() -> u128 {
2619    std::time::SystemTime::now()
2620        .duration_since(std::time::UNIX_EPOCH)
2621        .unwrap_or_default()
2622        .as_millis()
2623}
2624
2625#[cfg(test)]
2626mod tests {
2627    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2628    use crate::auth::store::{AuthStore, PrincipalRef};
2629    use crate::auth::UserId;
2630    use crate::auth::{AuthConfig, Role};
2631    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2632    use crate::storage::schema::Value;
2633    use crate::{RedDBOptions, RedDBRuntime};
2634    use std::sync::Arc;
2635
2636    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2637        Policy {
2638            id: id.to_string(),
2639            version: 1,
2640            tenant: None,
2641            created_at: 0,
2642            updated_at: 0,
2643            statements: vec![Statement {
2644                sid: None,
2645                effect: Effect::Allow,
2646                actions: vec![ActionPattern::Exact(action.to_string())],
2647                resources: vec![ResourcePattern::Exact {
2648                    kind: "collection".to_string(),
2649                    name: collection.to_string(),
2650                }],
2651                condition: None,
2652            }],
2653        }
2654    }
2655
2656    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2657        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2658        *rt.inner.auth_store.write() = Some(store.clone());
2659        store
2660    }
2661
2662    #[test]
2663    fn drop_denied_without_iam_policy() {
2664        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2665        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2666        let store = wire_auth_store(&rt);
2667        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2668        let select_only = Policy {
2669            id: "select-only".to_string(),
2670            version: 1,
2671            tenant: None,
2672            created_at: 0,
2673            updated_at: 0,
2674            statements: vec![Statement {
2675                sid: None,
2676                effect: Effect::Allow,
2677                actions: vec![ActionPattern::Exact("select".to_string())],
2678                resources: vec![ResourcePattern::Wildcard],
2679                condition: None,
2680            }],
2681        };
2682        store.put_policy_internal(select_only).unwrap();
2683        let alice = UserId::from_parts(None, "alice");
2684        store
2685            .attach_policy(PrincipalRef::User(alice), "select-only")
2686            .unwrap();
2687        set_current_auth_identity("alice".to_string(), Role::Write);
2688        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2689        clear_current_auth_identity();
2690        assert!(
2691            format!("{err}").contains("denied by IAM policy"),
2692            "got: {err}"
2693        );
2694    }
2695
2696    #[test]
2697    fn drop_allowed_with_explicit_iam_policy() {
2698        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2699        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2700        let store = wire_auth_store(&rt);
2701        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2702        store.put_policy_internal(policy).unwrap();
2703        let bob = UserId::from_parts(None, "bob");
2704        store
2705            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2706            .unwrap();
2707        set_current_auth_identity("bob".to_string(), Role::Write);
2708        rt.execute_query("DROP TABLE bar").unwrap();
2709        clear_current_auth_identity();
2710    }
2711
2712    #[test]
2713    fn drop_allowed_with_wildcard_iam_policy() {
2714        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2715        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2716        let store = wire_auth_store(&rt);
2717        let policy = Policy {
2718            id: "allow-drop-all".to_string(),
2719            version: 1,
2720            tenant: None,
2721            created_at: 0,
2722            updated_at: 0,
2723            statements: vec![Statement {
2724                sid: None,
2725                effect: Effect::Allow,
2726                actions: vec![ActionPattern::Exact("drop".to_string())],
2727                resources: vec![ResourcePattern::Wildcard],
2728                condition: None,
2729            }],
2730        };
2731        store.put_policy_internal(policy).unwrap();
2732        let carl = UserId::from_parts(None, "carl");
2733        store
2734            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2735            .unwrap();
2736        set_current_auth_identity("carl".to_string(), Role::Write);
2737        rt.execute_query("DROP TABLE baz").unwrap();
2738        clear_current_auth_identity();
2739    }
2740
2741    #[test]
2742    fn truncate_denied_without_iam_policy() {
2743        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2744        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2745        let store = wire_auth_store(&rt);
2746        // Acceptance #2 (#712 / S5A): in `policy_only` mode a
2747        // principal with no matching policy is denied even if their
2748        // role would have permitted the action. The pre-#712 default
2749        // of "no policy → deny" only holds under `policy_only`, so
2750        // pin the mode explicitly here so the assertion holds
2751        // regardless of the construction-time default.
2752        store
2753            .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2754        // A policy exists (IAM active) but gives no truncate right.
2755        let select_only = Policy {
2756            id: "select-only-2".to_string(),
2757            version: 1,
2758            tenant: None,
2759            created_at: 0,
2760            updated_at: 0,
2761            statements: vec![Statement {
2762                sid: None,
2763                effect: Effect::Allow,
2764                actions: vec![ActionPattern::Exact("select".to_string())],
2765                resources: vec![ResourcePattern::Wildcard],
2766                condition: None,
2767            }],
2768        };
2769        store.put_policy_internal(select_only).unwrap();
2770        let dana = UserId::from_parts(None, "dana");
2771        store
2772            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2773            .unwrap();
2774        set_current_auth_identity("dana".to_string(), Role::Write);
2775        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2776        clear_current_auth_identity();
2777        assert!(
2778            format!("{err}").contains("denied by IAM policy"),
2779            "got: {err}"
2780        );
2781    }
2782
2783    #[test]
2784    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2785        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2786        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2787            .unwrap();
2788        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2789            .unwrap();
2790        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2791            .unwrap();
2792
2793        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2794        assert_eq!(truncated.statement_type, "truncate");
2795        assert_eq!(truncated.affected_rows, 0);
2796
2797        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2798        assert!(empty.result.records.is_empty());
2799
2800        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2801            .unwrap();
2802        let selected = rt
2803            .execute_query("SELECT name FROM users WHERE id = 3")
2804            .unwrap();
2805        let name = selected.result.records[0].get("name").unwrap();
2806        assert_eq!(name, &Value::text("cy"));
2807        assert!(rt.db().collection_contract("users").is_some());
2808        assert!(rt
2809            .inner
2810            .index_store
2811            .list_indices("users")
2812            .iter()
2813            .any(|index| index.name == "idx_users_id"));
2814    }
2815
2816    #[test]
2817    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2818        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2819        rt.execute_query("CREATE QUEUE tasks").unwrap();
2820        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2821
2822        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2823        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2824
2825        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2826        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2827        assert_eq!(
2828            len.result.records[0].get("len"),
2829            Some(&Value::UnsignedInteger(0))
2830        );
2831    }
2832
2833    #[test]
2834    fn truncate_system_schema_is_read_only() {
2835        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2836        let err = rt
2837            .execute_query("TRUNCATE COLLECTION red.collections")
2838            .unwrap_err();
2839        assert!(format!("{err}").contains("system schema is read-only"));
2840    }
2841
2842    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2843
2844    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2845        let result = rt
2846            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2847            .expect("peek queue");
2848        result
2849            .result
2850            .records
2851            .iter()
2852            .map(
2853                |record| match record.get("payload").expect("payload column") {
2854                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2855                    other => panic!("expected JSON queue payload, got {other:?}"),
2856                },
2857            )
2858            .collect()
2859    }
2860
2861    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2862    /// `truncate` event, not one delete event per row.
2863    #[test]
2864    fn truncate_event_enabled_table_emits_single_truncate_event() {
2865        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2867            .unwrap();
2868        rt.execute_query(
2869            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2870        )
2871        .unwrap();
2872
2873        // Drain the 3 insert events so we start clean.
2874        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2875
2876        rt.execute_query("TRUNCATE TABLE users").unwrap();
2877
2878        let events = queue_payloads(&rt, "users_events");
2879        // Must be exactly 1 truncate event, not 3 delete events.
2880        assert_eq!(
2881            events.len(),
2882            1,
2883            "expected 1 truncate event, got {}",
2884            events.len()
2885        );
2886        let ev = events[0].as_object().expect("event is object");
2887        assert_eq!(
2888            ev.get("op").and_then(crate::json::Value::as_str),
2889            Some("truncate")
2890        );
2891        assert_eq!(
2892            ev.get("collection").and_then(crate::json::Value::as_str),
2893            Some("users")
2894        );
2895        assert_eq!(
2896            ev.get("entities_count")
2897                .and_then(crate::json::Value::as_u64),
2898            Some(3)
2899        );
2900        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2901        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2902        assert!(ev
2903            .get("event_id")
2904            .and_then(crate::json::Value::as_str)
2905            .is_some_and(|s| !s.is_empty()));
2906    }
2907
2908    /// `TRUNCATE users` on a collection without event subscription emits no events.
2909    #[test]
2910    fn truncate_no_events_collection_emits_nothing() {
2911        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2912        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2913            .unwrap();
2914        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2915            .unwrap();
2916        // No EVENTS subscription — truncate must work without touching any queue.
2917        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2918        // No crash, no queue to check. Just verify truncation happened.
2919        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2920        assert!(rows.result.records.is_empty());
2921    }
2922
2923    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2924    /// `collection_dropped` event. The subscription is removed from the
2925    /// source contract but the target queue is preserved for consumer drain.
2926    #[test]
2927    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2928        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2930            .unwrap();
2931        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2932            .unwrap();
2933
2934        // Drain insert events so we start clean.
2935        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2936
2937        rt.execute_query("DROP TABLE users").unwrap();
2938
2939        // Queue must still exist with 1 collection_dropped event.
2940        let events = queue_payloads(&rt, "users_events");
2941        assert_eq!(
2942            events.len(),
2943            1,
2944            "expected 1 collection_dropped event, got {}",
2945            events.len()
2946        );
2947        let ev = events[0].as_object().expect("event is object");
2948        assert_eq!(
2949            ev.get("op").and_then(crate::json::Value::as_str),
2950            Some("collection_dropped")
2951        );
2952        assert_eq!(
2953            ev.get("collection").and_then(crate::json::Value::as_str),
2954            Some("users")
2955        );
2956        assert_eq!(
2957            ev.get("final_entities_count")
2958                .and_then(crate::json::Value::as_u64),
2959            Some(2)
2960        );
2961        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2962        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2963        assert!(ev
2964            .get("event_id")
2965            .and_then(crate::json::Value::as_str)
2966            .is_some_and(|s| !s.is_empty()));
2967
2968        // Source collection is gone.
2969        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2970        assert!(
2971            format!("{err}").contains("users"),
2972            "expected not-found error"
2973        );
2974    }
2975
2976    /// `DROP TABLE users` on a collection without event subscription works
2977    /// normally with no event emitted.
2978    #[test]
2979    fn drop_no_events_collection_emits_nothing() {
2980        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2981        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2982            .unwrap();
2983        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2984            .unwrap();
2985        rt.execute_query("DROP TABLE plain").unwrap();
2986        // No crash and collection is gone.
2987        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2988        assert!(format!("{err}").contains("plain"));
2989    }
2990
2991    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2992
2993    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2994    #[test]
2995    fn ops_filter_insert_only_ignores_update_and_delete() {
2996        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2997        rt.execute_query(
2998            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2999        )
3000        .unwrap();
3001        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3002            .unwrap();
3003        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3004            .unwrap();
3005        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3006
3007        let events = queue_payloads(&rt, "items_events");
3008        // Only the INSERT should have fired.
3009        assert_eq!(
3010            events.len(),
3011            1,
3012            "expected 1 insert event, got {}",
3013            events.len()
3014        );
3015        assert_eq!(
3016            events[0]
3017                .as_object()
3018                .unwrap()
3019                .get("op")
3020                .and_then(crate::json::Value::as_str),
3021            Some("insert")
3022        );
3023    }
3024
3025    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
3026    #[test]
3027    fn where_filter_skips_rows_that_do_not_match() {
3028        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3029        rt.execute_query(
3030            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3031        )
3032        .unwrap();
3033
3034        // This row should generate an event.
3035        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3036            .unwrap();
3037        // This row should NOT generate an event.
3038        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3039            .unwrap();
3040
3041        let events = queue_payloads(&rt, "users_events");
3042        assert_eq!(
3043            events.len(),
3044            1,
3045            "expected 1 event (only active), got {}",
3046            events.len()
3047        );
3048        let ev = events[0].as_object().unwrap();
3049        assert_eq!(
3050            ev.get("op").and_then(crate::json::Value::as_str),
3051            Some("insert")
3052        );
3053        let after = ev.get("after").unwrap().as_object().unwrap();
3054        assert_eq!(
3055            after.get("status").and_then(crate::json::Value::as_str),
3056            Some("active")
3057        );
3058    }
3059
3060    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
3061    #[test]
3062    fn ops_filter_and_where_filter_combined() {
3063        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3064        rt.execute_query(
3065            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3066        )
3067        .unwrap();
3068
3069        // INSERT active → event
3070        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3071            .unwrap();
3072        // INSERT inactive → no event
3073        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3074            .unwrap();
3075        // UPDATE row 1 to inactive → after = inactive, no event
3076        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3077            .unwrap();
3078        // DELETE → ops_filter excludes it
3079        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3080
3081        let events = queue_payloads(&rt, "items_events");
3082        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
3083        assert_eq!(
3084            events.len(),
3085            1,
3086            "expected 1 event, got {}: {events:?}",
3087            events.len()
3088        );
3089        assert_eq!(
3090            events[0]
3091                .as_object()
3092                .unwrap()
3093                .get("op")
3094                .and_then(crate::json::Value::as_str),
3095            Some("insert")
3096        );
3097    }
3098
3099    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
3100    #[test]
3101    fn where_filter_on_delete_checks_before_state() {
3102        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3103        rt.execute_query(
3104            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3105        )
3106        .unwrap();
3107
3108        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3109            .unwrap();
3110
3111        // Delete active row → event (before-state was active)
3112        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3113        // Delete inactive row → no event (before-state was inactive)
3114        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3115
3116        let events = queue_payloads(&rt, "users_events");
3117        assert_eq!(
3118            events.len(),
3119            1,
3120            "expected 1 delete event, got {}",
3121            events.len()
3122        );
3123        let ev = events[0].as_object().unwrap();
3124        assert_eq!(
3125            ev.get("op").and_then(crate::json::Value::as_str),
3126            Some("delete")
3127        );
3128    }
3129
3130    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
3131
3132    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
3133    #[test]
3134    fn alter_add_column_on_event_enabled_table_succeeds() {
3135        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3136        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3137            .unwrap();
3138        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
3139        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3140            .unwrap();
3141        // The column is now in the contract.
3142        let contract = rt.db().collection_contract("users").unwrap();
3143        assert!(
3144            contract.declared_columns.iter().any(|c| c.name == "phone"),
3145            "phone column should be in contract"
3146        );
3147        // Subscription still enabled after the alter.
3148        assert!(
3149            contract.subscriptions.iter().any(|s| s.enabled),
3150            "subscription should remain enabled"
3151        );
3152    }
3153
3154    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
3155    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
3156    #[test]
3157    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3158        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3159        rt.execute_query(
3160            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3161        )
3162        .unwrap();
3163        // DROP COLUMN — schema change event path exercises, must not error.
3164        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3165            .unwrap();
3166        let contract = rt.db().collection_contract("items").unwrap();
3167        assert!(
3168            !contract.declared_columns.iter().any(|c| c.name == "secret"),
3169            "secret column should be removed"
3170        );
3171        // ENABLE RLS — non-column op, no schema-change event (coverage).
3172        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3173            .unwrap();
3174        // Collection and subscription still intact.
3175        assert!(
3176            contract.subscriptions.iter().any(|s| s.enabled),
3177            "subscription should remain enabled"
3178        );
3179    }
3180
3181    /// Slice G (#675) — every newly-created `CollectionModel::Vector`
3182    /// collection is marked `vector.turbo` and gains a materialised
3183    /// `TurboCollectionState`. The marker is what the SEARCH/INSERT
3184    /// hot paths read to branch between TurboQuant and the legacy
3185    /// brute-force fallback.
3186    #[test]
3187    fn create_vector_marks_collection_as_turbo_baseline() {
3188        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3189        rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3190        let store = rt.db().store();
3191        assert!(
3192            crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3193            "new vector collections must be turbo-marked baseline"
3194        );
3195        assert!(
3196            rt.db().turbo_state("embeddings").is_some(),
3197            "turbo_state must materialise after CREATE VECTOR"
3198        );
3199    }
3200
3201    /// Non-vector collections must NOT be turbo-marked. Guards against
3202    /// the always-baseline change accidentally leaking the marker onto
3203    /// CREATE TABLE / CREATE DOCUMENT / etc.
3204    #[test]
3205    fn create_table_does_not_mark_turbo() {
3206        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3207        rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3208        let store = rt.db().store();
3209        assert!(
3210            !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3211            "non-vector collections must not gain the turbo marker"
3212        );
3213        assert!(rt.db().turbo_state("plain").is_none());
3214    }
3215
3216    /// `CREATE COLLECTION KIND vector.turbo` continues to mark the
3217    /// collection (idempotent with the new always-baseline path).
3218    #[test]
3219    fn create_collection_kind_vector_turbo_still_marked() {
3220        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3221        rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3222            .unwrap();
3223        let store = rt.db().store();
3224        assert!(crate::runtime::vector_turbo_kind::is_turbo(
3225            &store, "turbo_v"
3226        ));
3227        assert!(rt.db().turbo_state("turbo_v").is_some());
3228    }
3229}