Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(
265                &query.name,
266                query.collection_model,
267                query.analytics_config.clone(),
268            )
269        };
270        self.inner
271            .db
272            .save_collection_contract(contract)
273            .map_err(|err| RedDBError::Internal(err.to_string()))?;
274        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
275            store.set_config_tree(
276                &format!("red.collection_tenants.{}", query.name),
277                &crate::serde_json::Value::String(tenant_id),
278            );
279        }
280        self.inner
281            .db
282            .persist_metadata()
283            .map_err(|err| RedDBError::Internal(err.to_string()))?;
284        self.invalidate_result_cache();
285
286        Ok(RuntimeQueryResult::ok_message(
287            raw_query.to_string(),
288            &format!("{label} '{}' created", query.name),
289            "create",
290        ))
291    }
292
293    /// Bootstrap a system-owned graph collection declared `WITH ANALYTICS`
294    /// (issue #803). The SQL DDL path refuses `red.*` names via
295    /// [`is_system_schema_name`], so built-ins like `red.topology.cluster`
296    /// cannot be created through `CREATE GRAPH`; this method drives the same
297    /// contract-build / save / persist path directly, bypassing only that
298    /// guard. Idempotent: a no-op when the collection already exists (its
299    /// analytics contract is recovered from the WAL on restart), so it is safe
300    /// to call unconditionally on every boot.
301    pub fn ensure_system_graph_with_analytics(
302        &self,
303        name: &str,
304        outputs: &[crate::catalog::AnalyticsOutput],
305    ) -> RedDBResult<()> {
306        let store = self.inner.db.store();
307        if store.get_collection(name).is_some() {
308            return Ok(());
309        }
310        let analytics_config = outputs
311            .iter()
312            .map(|output| crate::catalog::AnalyticsViewDescriptor {
313                output: *output,
314                algorithm: None,
315                resolution: None,
316                max_iterations: None,
317                tolerance: None,
318            })
319            .collect();
320        store
321            .create_collection(name)
322            .map_err(|err| RedDBError::Internal(err.to_string()))?;
323        let contract = keyed_collection_contract(
324            name,
325            crate::catalog::CollectionModel::Graph,
326            analytics_config,
327        );
328        self.inner
329            .db
330            .save_collection_contract(contract)
331            .map_err(|err| RedDBError::Internal(err.to_string()))?;
332        self.inner
333            .db
334            .persist_metadata()
335            .map_err(|err| RedDBError::Internal(err.to_string()))?;
336        self.invalidate_result_cache();
337        Ok(())
338    }
339
340    pub fn execute_create_collection(
341        &self,
342        raw_query: &str,
343        query: &CreateCollectionQuery,
344    ) -> RedDBResult<RuntimeQueryResult> {
345        let model = match query.kind.as_str() {
346            "graph" => CollectionModel::Graph,
347            "document" => CollectionModel::Document,
348            "metrics" => CollectionModel::Metrics,
349            "vector.turbo" => {
350                let dimension = query.vector_dimension.ok_or_else(|| {
351                    RedDBError::Query(
352                        "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
353                    )
354                })?;
355                let create = CreateVectorQuery {
356                    name: query.name.clone(),
357                    dimension,
358                    metric: query
359                        .vector_metric
360                        .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
361                    if_not_exists: query.if_not_exists,
362                };
363                let result = self.execute_create_vector(raw_query, &create)?;
364                // Issue #693 — durable kind marker that distinguishes
365                // this collection from the legacy `vector` path on
366                // every restart. Runtime state (TurboQuantIndex +
367                // TurboExtent) is lazily materialised by
368                // `RedDB::turbo_state` on first INSERT/SEARCH so the
369                // marker is the only thing that needs to survive.
370                let store = self.inner.db.store();
371                crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
372                self.inner
373                    .db
374                    .persist_metadata()
375                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
376                // Eagerly construct the state so the TurboExtent (if
377                // any) is reserved at creation time rather than on
378                // first INSERT. Errors are swallowed: the lazy path in
379                // `turbo_state` will retry on the next access.
380                let _ = self.inner.db.turbo_state(&query.name);
381                return Ok(result);
382            }
383            // KIND blockchain — issue #523 foundation: stored on top of a
384            // Table-shaped collection. The `chain` marker + reserved-column
385            // discipline make the difference. Schema validation, conflict
386            // retries, and `verify_chain` come in later iterations.
387            "blockchain" => CollectionModel::Table,
388            other => {
389                return Err(RedDBError::Query(format!(
390                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
391                )));
392            }
393        };
394        let create = CreateTableQuery {
395            collection_model: model,
396            name: query.name.clone(),
397            columns: Vec::new(),
398            if_not_exists: query.if_not_exists,
399            default_ttl_ms: None,
400            metrics_rollup_policies: Vec::new(),
401            context_index_fields: Vec::new(),
402            context_index_enabled: false,
403            timestamps: false,
404            partition_by: None,
405            tenant_by: None,
406            append_only: false,
407            subscriptions: Vec::new(),
408            analytics_config: Vec::new(),
409            vault_own_master_key: false,
410        };
411        let result = self.execute_create_table(raw_query, &create)?;
412        if query.kind == "blockchain" {
413            self.install_blockchain_kind(&query.name)?;
414        }
415        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
416        // already produces a validated 32-byte pubkey list; installing
417        // the registry stamps the per-collection signer set into
418        // `red_config` so the INSERT path can load it cheaply.
419        if !query.allowed_signers.is_empty() {
420            let actor = crate::runtime::impl_core::current_user_projected()
421                .unwrap_or_else(|| "@system/create-collection".to_string());
422            crate::runtime::signed_writes_kind::install(
423                &self.inner.db.store(),
424                &query.name,
425                &query.allowed_signers,
426                &actor,
427            );
428        }
429        Ok(result)
430    }
431
432    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
433    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
434    /// has a row at height 0 we leave it.
435    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
436        use crate::runtime::blockchain_kind;
437        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
438        use std::sync::Arc;
439
440        let store = self.inner.db.store();
441        blockchain_kind::mark_as_chain(&store, name);
442
443        let existing_tip = blockchain_kind::chain_tip(&store, name);
444        if existing_tip.height.is_some() {
445            return Ok(());
446        }
447
448        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
449        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
450            fields.into_iter().collect();
451        let entity = UnifiedEntity::new(
452            EntityId::new(0),
453            EntityKind::TableRow {
454                table: Arc::from(name),
455                row_id: 0,
456            },
457            EntityData::Row(RowData {
458                columns: Vec::new(),
459                named: Some(named),
460                schema: None,
461            }),
462        );
463        store
464            .insert_auto(name, entity)
465            .map_err(|err| RedDBError::Internal(err.to_string()))?;
466        // #524: prime the in-memory tip cache so the chain-tip endpoint and
467        // subsequent INSERTs don't have to scan the collection to find genesis.
468        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
469            self.inner
470                .chain_tip_cache
471                .lock()
472                .insert(name.to_string(), tip);
473        }
474        Ok(())
475    }
476
477    pub fn execute_create_vector(
478        &self,
479        raw_query: &str,
480        query: &CreateVectorQuery,
481    ) -> RedDBResult<RuntimeQueryResult> {
482        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
483        if is_system_schema_name(&query.name) {
484            return Err(RedDBError::Query("system schema is read-only".to_string()));
485        }
486        let store = self.inner.db.store();
487        if store.get_collection(&query.name).is_some() {
488            if query.if_not_exists {
489                return Ok(RuntimeQueryResult::ok_message(
490                    raw_query.to_string(),
491                    &format!("vector '{}' already exists", query.name),
492                    "create",
493                ));
494            }
495            return Err(RedDBError::Query(format!(
496                "vector '{}' already exists",
497                query.name
498            )));
499        }
500
501        store
502            .create_collection(&query.name)
503            .map_err(|err| RedDBError::Internal(err.to_string()))?;
504        self.inner
505            .db
506            .save_collection_contract(vector_collection_contract(query))
507            .map_err(|err| RedDBError::Internal(err.to_string()))?;
508        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
509            store.set_config_tree(
510                &format!("red.collection_tenants.{}", query.name),
511                &crate::serde_json::Value::String(tenant_id),
512            );
513        }
514        // Slice G (#675) — register `vector.turbo` as the built-in
515        // baseline for every newly-created vector collection. The
516        // marker is the durable signal `RedDB::turbo_state` reads to
517        // materialise the TurboQuantIndex + TurboExtent on first
518        // INSERT/SEARCH. Pre-existing vector collections were created
519        // without the marker and stay on the brute-force fallback path
520        // until they are explicitly migrated — there is no implicit
521        // backfill in this slice.
522        crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
523        self.inner
524            .db
525            .persist_metadata()
526            .map_err(|err| RedDBError::Internal(err.to_string()))?;
527        // Eagerly materialise the per-collection turbo state so the
528        // TurboExtent (when the store is paged) is reserved at CREATE
529        // time rather than on the first INSERT. Failures are swallowed
530        // — the lazy path in `turbo_state` retries on next access.
531        let _ = self.inner.db.turbo_state(&query.name);
532        self.invalidate_result_cache();
533
534        Ok(RuntimeQueryResult::ok_message(
535            raw_query.to_string(),
536            &format!("vector '{}' created", query.name),
537            "create",
538        ))
539    }
540
541    fn provision_vault_key_material(
542        &self,
543        collection: &str,
544        own_master_key: bool,
545    ) -> RedDBResult<()> {
546        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
547            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
548        })?;
549        if !auth_store.is_vault_backed() {
550            return Err(RedDBError::Query(
551                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
552            ));
553        }
554
555        if auth_store.vault_secret_key().is_none() {
556            let key = crate::auth::store::random_bytes(32);
557            auth_store
558                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
559                .map_err(|err| RedDBError::Query(err.to_string()))?;
560        }
561
562        if own_master_key {
563            let key = crate::auth::store::random_bytes(32);
564            auth_store
565                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
566                .map_err(|err| RedDBError::Query(err.to_string()))?;
567        }
568
569        Ok(())
570    }
571
572    /// Execute DROP TABLE
573    ///
574    /// Drops the collection and all its data from the store.
575    pub fn execute_drop_table(
576        &self,
577        raw_query: &str,
578        query: &DropTableQuery,
579    ) -> RedDBResult<RuntimeQueryResult> {
580        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
581        let store = self.inner.db.store();
582
583        if is_system_schema_name(&query.name) {
584            return Err(RedDBError::Query("system schema is read-only".to_string()));
585        }
586
587        let exists = store.get_collection(&query.name).is_some();
588        if !exists {
589            if query.if_exists {
590                return Ok(RuntimeQueryResult::ok_message(
591                    raw_query.to_string(),
592                    &format!("table '{}' does not exist", query.name),
593                    "drop",
594                ));
595            }
596            return Err(RedDBError::NotFound(format!(
597                "table '{}' not found",
598                query.name
599            )));
600        }
601        let actual =
602            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
603        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
604
605        // Emit 1 collection_dropped event before storage is wiped.
606        // Queue is preserved; subscription is removed with the contract below.
607        let final_count = store
608            .get_collection(&query.name)
609            .map(|manager| manager.query_all(|_| true).len() as u64)
610            .unwrap_or(0);
611        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
612            self,
613            &query.name,
614            final_count,
615        )?;
616
617        let orphaned_indices: Vec<String> = self
618            .inner
619            .index_store
620            .list_indices(&query.name)
621            .into_iter()
622            .map(|index| index.name)
623            .collect();
624        for name in &orphaned_indices {
625            self.inner.index_store.drop_index(name, &query.name);
626        }
627
628        store
629            .drop_collection(&query.name)
630            .map_err(|err| RedDBError::Internal(err.to_string()))?;
631        self.inner.db.invalidate_vector_index(&query.name);
632        self.inner.db.clear_collection_default_ttl_ms(&query.name);
633        self.inner
634            .db
635            .remove_collection_contract(&query.name)
636            .map_err(|err| RedDBError::Internal(err.to_string()))?;
637        self.clear_table_planner_stats(&query.name);
638        self.invalidate_result_cache();
639        // Issue #119: a dropped collection vanishes from every
640        // (tenant, role)'s visible-collections set. Auth is optional in
641        // embedded mode so guard the call.
642        if let Some(store) = self.inner.auth_store.read().clone() {
643            store.invalidate_visible_collections_cache();
644        }
645        self.inner
646            .db
647            .persist_metadata()
648            .map_err(|err| RedDBError::Internal(err.to_string()))?;
649        // Issue #120 — drop both the collection entries *and* every
650        // index entry that was scoped to this collection.  Dropping the
651        // collection wipes columns + collection-name + type-tags +
652        // index hits in one pass via `purge_collection_entries`, so
653        // the explicit `DropIndex` calls would be redundant.
654        self.schema_vocabulary_apply(
655            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
656                collection: query.name.clone(),
657            },
658        );
659
660        Ok(RuntimeQueryResult::ok_message(
661            raw_query.to_string(),
662            &format!("table '{}' dropped", query.name),
663            "drop",
664        ))
665    }
666
667    pub fn execute_drop_graph(
668        &self,
669        raw_query: &str,
670        query: &DropGraphQuery,
671    ) -> RedDBResult<RuntimeQueryResult> {
672        self.execute_drop_typed_collection(
673            raw_query,
674            &query.name,
675            query.if_exists,
676            CollectionModel::Graph,
677            "graph",
678        )
679    }
680
681    pub fn execute_drop_vector(
682        &self,
683        raw_query: &str,
684        query: &DropVectorQuery,
685    ) -> RedDBResult<RuntimeQueryResult> {
686        self.execute_drop_typed_collection(
687            raw_query,
688            &query.name,
689            query.if_exists,
690            CollectionModel::Vector,
691            "vector",
692        )
693    }
694
695    pub fn execute_drop_document(
696        &self,
697        raw_query: &str,
698        query: &DropDocumentQuery,
699    ) -> RedDBResult<RuntimeQueryResult> {
700        self.execute_drop_typed_collection(
701            raw_query,
702            &query.name,
703            query.if_exists,
704            CollectionModel::Document,
705            "document",
706        )
707    }
708
709    pub fn execute_drop_kv(
710        &self,
711        raw_query: &str,
712        query: &DropKvQuery,
713    ) -> RedDBResult<RuntimeQueryResult> {
714        let label = polymorphic_resolver::model_name(query.model);
715        self.execute_drop_typed_collection(
716            raw_query,
717            &query.name,
718            query.if_exists,
719            query.model,
720            label,
721        )
722    }
723
724    pub fn execute_drop_collection(
725        &self,
726        raw_query: &str,
727        query: &DropCollectionQuery,
728    ) -> RedDBResult<RuntimeQueryResult> {
729        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
730        if is_system_schema_name(&query.name) {
731            return Err(RedDBError::Query("system schema is read-only".to_string()));
732        }
733        let store = self.inner.db.store();
734        if store.get_collection(&query.name).is_none() {
735            if query.if_exists {
736                return Ok(RuntimeQueryResult::ok_message(
737                    raw_query.to_string(),
738                    &format!("collection '{}' does not exist", query.name),
739                    "drop",
740                ));
741            }
742            return Err(RedDBError::NotFound(format!(
743                "collection '{}' not found",
744                query.name
745            )));
746        }
747
748        let actual =
749            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
750        if let Some(expected) = query.model {
751            polymorphic_resolver::ensure_model_match(expected, actual)?;
752        }
753
754        match actual {
755            CollectionModel::Table => self.execute_drop_table(
756                raw_query,
757                &DropTableQuery {
758                    name: query.name.clone(),
759                    if_exists: query.if_exists,
760                },
761            ),
762            CollectionModel::TimeSeries => self.execute_drop_timeseries(
763                raw_query,
764                &DropTimeSeriesQuery {
765                    name: query.name.clone(),
766                    if_exists: query.if_exists,
767                },
768            ),
769            CollectionModel::Queue => self.execute_drop_queue(
770                raw_query,
771                &DropQueueQuery {
772                    name: query.name.clone(),
773                    if_exists: query.if_exists,
774                },
775            ),
776            CollectionModel::Graph => self.execute_drop_graph(
777                raw_query,
778                &DropGraphQuery {
779                    name: query.name.clone(),
780                    if_exists: query.if_exists,
781                },
782            ),
783            CollectionModel::Vector => self.execute_drop_vector(
784                raw_query,
785                &DropVectorQuery {
786                    name: query.name.clone(),
787                    if_exists: query.if_exists,
788                },
789            ),
790            CollectionModel::Document => self.execute_drop_document(
791                raw_query,
792                &DropDocumentQuery {
793                    name: query.name.clone(),
794                    if_exists: query.if_exists,
795                },
796            ),
797            CollectionModel::Kv => self.execute_drop_kv(
798                raw_query,
799                &DropKvQuery {
800                    name: query.name.clone(),
801                    if_exists: query.if_exists,
802                    model: CollectionModel::Kv,
803                },
804            ),
805            CollectionModel::Config => self.execute_drop_kv(
806                raw_query,
807                &DropKvQuery {
808                    name: query.name.clone(),
809                    if_exists: query.if_exists,
810                    model: CollectionModel::Config,
811                },
812            ),
813            CollectionModel::Vault => self.execute_drop_kv(
814                raw_query,
815                &DropKvQuery {
816                    name: query.name.clone(),
817                    if_exists: query.if_exists,
818                    model: CollectionModel::Vault,
819                },
820            ),
821            CollectionModel::Hll => self.execute_probabilistic_command(
822                raw_query,
823                &ProbabilisticCommand::DropHll {
824                    name: query.name.clone(),
825                    if_exists: query.if_exists,
826                },
827            ),
828            CollectionModel::Sketch => self.execute_probabilistic_command(
829                raw_query,
830                &ProbabilisticCommand::DropSketch {
831                    name: query.name.clone(),
832                    if_exists: query.if_exists,
833                },
834            ),
835            CollectionModel::Filter => self.execute_probabilistic_command(
836                raw_query,
837                &ProbabilisticCommand::DropFilter {
838                    name: query.name.clone(),
839                    if_exists: query.if_exists,
840                },
841            ),
842            CollectionModel::Metrics => self.execute_drop_typed_collection(
843                raw_query,
844                &query.name,
845                query.if_exists,
846                CollectionModel::Metrics,
847                "metrics",
848            ),
849            CollectionModel::Mixed => self.execute_drop_typed_collection(
850                raw_query,
851                &query.name,
852                query.if_exists,
853                CollectionModel::Mixed,
854                "collection",
855            ),
856        }
857    }
858
859    /// Issue #801 — guard `ALTER GRAPH ... ADD|DROP ANALYTICS` so analytics
860    /// lifecycle mutations only land on a collection declared as a graph. The
861    /// `<graph>.<output>` resolver only consults `analytics_config` on
862    /// graph-model contracts, so allowing the op on any other model would write
863    /// dead config the reader can never surface.
864    fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
865        let is_graph = self
866            .inner
867            .db
868            .collection_contract(name)
869            .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
870            .unwrap_or(false);
871        if !is_graph {
872            return Err(RedDBError::Query(format!(
873                "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
874            )));
875        }
876        Ok(())
877    }
878
879    /// Execute ALTER TABLE
880    ///
881    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
882    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
883    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
884    pub fn execute_alter_table(
885        &self,
886        raw_query: &str,
887        query: &AlterTableQuery,
888    ) -> RedDBResult<RuntimeQueryResult> {
889        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
890        let store = self.inner.db.store();
891
892        // Verify the table exists.
893        if store.get_collection(&query.name).is_none() {
894            return Err(RedDBError::NotFound(format!(
895                "table '{}' not found",
896                query.name
897            )));
898        }
899
900        let mut messages = Vec::new();
901
902        // Collect column-level changes upfront for schema-change event emission below.
903        let fields_added: Vec<String> = query
904            .operations
905            .iter()
906            .filter_map(|op| {
907                if let AlterOperation::AddColumn(col) = op {
908                    Some(col.name.clone())
909                } else {
910                    None
911                }
912            })
913            .collect();
914        let fields_removed: Vec<String> = query
915            .operations
916            .iter()
917            .filter_map(|op| {
918                if let AlterOperation::DropColumn(name) = op {
919                    Some(name.clone())
920                } else {
921                    None
922                }
923            })
924            .collect();
925
926        for op in &query.operations {
927            match op {
928                AlterOperation::AddColumn(col) => {
929                    // Schema-on-read: column will be available on next insert.
930                    messages.push(format!("column '{}' added", col.name));
931                }
932                AlterOperation::DropColumn(name) => {
933                    messages.push(format!("column '{}' dropped", name));
934                }
935                AlterOperation::RenameColumn { from, to } => {
936                    messages.push(format!("column '{}' renamed to '{}'", from, to));
937                }
938                AlterOperation::AttachPartition { child, bound } => {
939                    // Persist child → parent binding in red_config so the
940                    // future planner-side pruner can enumerate children and
941                    // evaluate their bounds.
942                    store.set_config_tree(
943                        &format!("partition.{}.children.{}", query.name, child),
944                        &crate::serde_json::Value::String(bound.clone()),
945                    );
946                    messages.push(format!(
947                        "partition '{child}' attached to '{}' ({bound})",
948                        query.name
949                    ));
950                }
951                AlterOperation::DetachPartition { child } => {
952                    store.set_config_tree(
953                        &format!("partition.{}.children.{}", query.name, child),
954                        &crate::serde_json::Value::Null,
955                    );
956                    messages.push(format!(
957                        "partition '{child}' detached from '{}'",
958                        query.name
959                    ));
960                }
961                AlterOperation::EnableRowLevelSecurity => {
962                    self.inner
963                        .rls_enabled_tables
964                        .write()
965                        .insert(query.name.clone());
966                    // Persist flag so RLS survives restart via red_config.
967                    store.set_config_tree(
968                        &format!("rls.enabled.{}", query.name),
969                        &crate::serde_json::Value::Bool(true),
970                    );
971                    self.invalidate_plan_cache();
972                    messages.push(format!("row level security enabled on '{}'", query.name));
973                }
974                AlterOperation::DisableRowLevelSecurity => {
975                    self.inner.rls_enabled_tables.write().remove(&query.name);
976                    store.set_config_tree(
977                        &format!("rls.enabled.{}", query.name),
978                        &crate::serde_json::Value::Null,
979                    );
980                    self.invalidate_plan_cache();
981                    messages.push(format!("row level security disabled on '{}'", query.name));
982                }
983                // Phase 2.5.4: retrofit tenancy onto an existing table.
984                AlterOperation::EnableTenancy { column } => {
985                    store.set_config_tree(
986                        &format!("tenant_tables.{}.column", query.name),
987                        &crate::serde_json::Value::String(column.clone()),
988                    );
989                    self.register_tenant_table(&query.name, column);
990                    self.invalidate_plan_cache();
991                    messages.push(format!(
992                        "tenancy enabled on '{}' by column '{column}'",
993                        query.name
994                    ));
995                }
996                AlterOperation::DisableTenancy => {
997                    store.set_config_tree(
998                        &format!("tenant_tables.{}.column", query.name),
999                        &crate::serde_json::Value::Null,
1000                    );
1001                    self.unregister_tenant_table(&query.name);
1002                    self.invalidate_plan_cache();
1003                    messages.push(format!("tenancy disabled on '{}'", query.name));
1004                }
1005                AlterOperation::SetAppendOnly(on) => {
1006                    // Contract is the single source of truth for the
1007                    // UPDATE/DELETE parse-time guard. The flag lands
1008                    // below via `apply_alter_operations_to_contract`;
1009                    // here we only publish the human-readable message.
1010                    messages.push(format!(
1011                        "append_only {} on '{}'",
1012                        if *on { "enabled" } else { "disabled" },
1013                        query.name
1014                    ));
1015                }
1016                AlterOperation::SetVersioned(on) => {
1017                    // Opt the collection into (or out of) Git-for-Data.
1018                    // Persists a row in red_vcs_settings; next AS OF /
1019                    // merge / diff against this table honours the
1020                    // flag. Retroactive: existing row versions whose
1021                    // xmins are still pinned by commits become
1022                    // reachable via AS OF COMMIT immediately.
1023                    self.vcs_set_versioned(&query.name, *on)?;
1024                    messages.push(format!(
1025                        "versioned {} on '{}'",
1026                        if *on { "enabled" } else { "disabled" },
1027                        query.name
1028                    ));
1029                }
1030                AlterOperation::EnableEvents(subscription) => {
1031                    let mut subscription = subscription.clone();
1032                    subscription.source = query.name.clone();
1033                    validate_event_subscriptions(
1034                        self,
1035                        &query.name,
1036                        std::slice::from_ref(&subscription),
1037                    )?;
1038                    ensure_event_target_queue(self, &subscription.target_queue)?;
1039                    messages.push(format!(
1040                        "events enabled on '{}' to '{}'",
1041                        query.name, subscription.target_queue
1042                    ));
1043                }
1044                AlterOperation::DisableEvents => {
1045                    messages.push(format!("events disabled on '{}'", query.name));
1046                }
1047                AlterOperation::AddSubscription { name, descriptor } => {
1048                    let mut sub = descriptor.clone();
1049                    sub.name = name.clone();
1050                    sub.source = query.name.clone();
1051                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1052                    ensure_event_target_queue(self, &sub.target_queue)?;
1053                    messages.push(format!(
1054                        "subscription '{}' added on '{}' to '{}'",
1055                        name, query.name, sub.target_queue
1056                    ));
1057                }
1058                AlterOperation::DropSubscription { name } => {
1059                    messages.push(format!(
1060                        "subscription '{}' dropped on '{}'",
1061                        name, query.name
1062                    ));
1063                }
1064                AlterOperation::AddSigner { pubkey } => {
1065                    // Issue #522 — admin-gated registry mutation. The
1066                    // standard DDL `check_write` above gates by role; we
1067                    // additionally verify the collection actually has a
1068                    // signed-writes registry installed so this isn't a
1069                    // covert way to retrofit one (use `CREATE COLLECTION
1070                    // ... SIGNED_BY (...)` for that).
1071                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1072                        return Err(RedDBError::Query(format!(
1073                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1074                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1075                            query.name
1076                        )));
1077                    }
1078                    let actor = crate::runtime::impl_core::current_user_projected()
1079                        .unwrap_or_else(|| "@system/alter".to_string());
1080                    let changed = crate::runtime::signed_writes_kind::add_signer(
1081                        &store,
1082                        &query.name,
1083                        *pubkey,
1084                        &actor,
1085                    );
1086                    messages.push(format!(
1087                        "signer {} on '{}'",
1088                        if changed { "added" } else { "already present" },
1089                        query.name
1090                    ));
1091                }
1092                AlterOperation::RevokeSigner { pubkey } => {
1093                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1094                        return Err(RedDBError::Query(format!(
1095                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1096                            query.name
1097                        )));
1098                    }
1099                    let actor = crate::runtime::impl_core::current_user_projected()
1100                        .unwrap_or_else(|| "@system/alter".to_string());
1101                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
1102                        &store,
1103                        &query.name,
1104                        pubkey,
1105                        &actor,
1106                    );
1107                    messages.push(format!(
1108                        "signer {} on '{}'",
1109                        if changed {
1110                            "revoked"
1111                        } else {
1112                            "already revoked"
1113                        },
1114                        query.name
1115                    ));
1116                }
1117                AlterOperation::SetRetention { duration_ms } => {
1118                    // Issue #580 — validate that the collection has a
1119                    // timestamp column the lazy-on-scan filter can key
1120                    // off. Without one there is no anchor for "older
1121                    // than now - duration"; reject at ALTER time so the
1122                    // policy can never silently hide all rows.
1123                    let existing = self.inner.db.collection_contract(&query.name);
1124                    let has_ts_column = existing
1125                        .as_ref()
1126                        .map(retention_timestamp_column_exists)
1127                        .unwrap_or(false);
1128                    if !has_ts_column {
1129                        return Err(RedDBError::Query(format!(
1130                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1131                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1132                             or enable WITH timestamps = true before setting a \
1133                             retention policy",
1134                            query.name
1135                        )));
1136                    }
1137                    messages.push(format!(
1138                        "retention set to {duration_ms} ms on '{}'",
1139                        query.name
1140                    ));
1141                }
1142                AlterOperation::UnsetRetention => {
1143                    messages.push(format!("retention cleared on '{}'", query.name));
1144                }
1145                AlterOperation::AddAnalytics(views) => {
1146                    // Issue #801 — analytics lifecycle is graph-only. Reject the
1147                    // op on a non-graph collection so a misrouted ALTER fails
1148                    // loudly instead of silently writing analytics_config onto a
1149                    // model whose reader never resolves `<name>.<output>`.
1150                    self.require_graph_for_analytics(&query.name)?;
1151                    let existing = self.inner.db.collection_contract(&query.name);
1152                    let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1153                        existing
1154                            .as_ref()
1155                            .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1156                            .unwrap_or_default();
1157                    for view in views {
1158                        if enabled.contains(&view.output) {
1159                            // Idempotent: adding an already-enabled output is a
1160                            // no-op — no error, no duplicate state.
1161                            messages.push(format!(
1162                                "analytics '{}' already enabled on '{}'",
1163                                view.output.as_str(),
1164                                query.name
1165                            ));
1166                        } else {
1167                            messages.push(format!(
1168                                "analytics '{}' enabled on '{}'",
1169                                view.output.as_str(),
1170                                query.name
1171                            ));
1172                        }
1173                    }
1174                }
1175                AlterOperation::DropAnalytics(output) => {
1176                    self.require_graph_for_analytics(&query.name)?;
1177                    let enabled = self
1178                        .inner
1179                        .db
1180                        .collection_contract(&query.name)
1181                        .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1182                        .unwrap_or(false);
1183                    if !enabled {
1184                        // Dropping an output that was never enabled is a clean,
1185                        // explicit error (AC4) rather than a silent no-op.
1186                        return Err(RedDBError::Query(format!(
1187                            "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1188                            output.as_str(),
1189                            query.name
1190                        )));
1191                    }
1192                    messages.push(format!(
1193                        "analytics '{}' disabled on '{}'",
1194                        output.as_str(),
1195                        query.name
1196                    ));
1197                }
1198            }
1199        }
1200
1201        let mut contract = self
1202            .inner
1203            .db
1204            .collection_contract(&query.name)
1205            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1206        apply_alter_operations_to_contract(&mut contract, &query.operations);
1207        contract.version = contract.version.saturating_add(1);
1208        contract.updated_at_unix_ms = current_unix_ms();
1209        self.inner
1210            .db
1211            .save_collection_contract(contract)
1212            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1213        // Issue #301 — emit OperatorEvent when column schema changes on a
1214        // collection that has active event subscriptions, so operators know
1215        // downstream consumers may see a different payload shape.
1216        if !fields_added.is_empty() || !fields_removed.is_empty() {
1217            let sub_names: Vec<String> = self
1218                .inner
1219                .db
1220                .collection_contract(&query.name)
1221                .map(|c| {
1222                    c.subscriptions
1223                        .iter()
1224                        .filter(|s| s.enabled)
1225                        .map(|s| s.name.clone())
1226                        .collect()
1227                })
1228                .unwrap_or_default();
1229            if !sub_names.is_empty() {
1230                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1231                    collection: query.name.clone(),
1232                    subscription_names: sub_names.join(", "),
1233                    fields_added: fields_added.join(", "),
1234                    fields_removed: fields_removed.join(", "),
1235                    lsn: self.cdc_current_lsn(),
1236                }
1237                .emit_global();
1238            }
1239        }
1240
1241        self.clear_table_planner_stats(&query.name);
1242        self.invalidate_result_cache();
1243        // Issue #120 — refresh the schema-vocabulary entries from the
1244        // post-ALTER contract. Drop+recreate inside the index keeps
1245        // the invalidation guarantee complete (no stale columns from
1246        // before an ALTER ... DROP COLUMN).
1247        let post_alter_columns: Vec<String> = self
1248            .inner
1249            .db
1250            .collection_contract(&query.name)
1251            .map(|contract| {
1252                contract
1253                    .declared_columns
1254                    .iter()
1255                    .map(|col| col.name.clone())
1256                    .collect()
1257            })
1258            .unwrap_or_default();
1259        self.schema_vocabulary_apply(
1260            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1261                collection: query.name.clone(),
1262                columns: post_alter_columns,
1263                type_tags: Vec::new(),
1264                description: None,
1265            },
1266        );
1267
1268        let message = if messages.is_empty() {
1269            format!("table '{}' altered (no operations)", query.name)
1270        } else {
1271            format!("table '{}' altered: {}", query.name, messages.join(", "))
1272        };
1273
1274        Ok(RuntimeQueryResult::ok_message(
1275            raw_query.to_string(),
1276            &message,
1277            "alter",
1278        ))
1279    }
1280
1281    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1282    ///
1283    /// Pure read: computes the schema diff between the target table's
1284    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1285    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1286    /// JSON. Never mutates storage.
1287    pub fn execute_explain_alter(
1288        &self,
1289        raw_query: &str,
1290        query: &ExplainAlterQuery,
1291    ) -> RedDBResult<RuntimeQueryResult> {
1292        // Validate the target CREATE TABLE body so syntactically valid
1293        // but semantically broken targets (bad SQL types, duplicate
1294        // columns) are caught here rather than inside the diff engine.
1295        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1296
1297        let current_contract = self.inner.db.collection_contract(&query.target.name);
1298
1299        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1300            .as_ref()
1301            .map(|c| c.declared_columns.clone())
1302            .unwrap_or_default();
1303
1304        let diff = super::schema_diff::compute_column_diff(
1305            &query.target.name,
1306            &current_columns,
1307            &query.target.columns,
1308        );
1309
1310        let rendered = match query.format {
1311            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1312            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1313        };
1314
1315        let format_label = match query.format {
1316            ExplainFormat::Sql => "sql",
1317            ExplainFormat::Json => "json",
1318        };
1319
1320        let columns = vec![
1321            "table".to_string(),
1322            "format".to_string(),
1323            "diff".to_string(),
1324        ];
1325        let row = vec![
1326            ("table".to_string(), Value::text(query.target.name.clone())),
1327            ("format".to_string(), Value::text(format_label.to_string())),
1328            ("diff".to_string(), Value::text(rendered)),
1329        ];
1330
1331        Ok(RuntimeQueryResult::ok_records(
1332            raw_query.to_string(),
1333            columns,
1334            vec![row],
1335            "explain",
1336        ))
1337    }
1338
1339    /// Execute CREATE INDEX
1340    ///
1341    /// Registers a new index on a collection, builds it from existing data,
1342    /// and makes it available to the query executor for O(1) lookups.
1343    pub fn execute_create_index(
1344        &self,
1345        raw_query: &str,
1346        query: &CreateIndexQuery,
1347    ) -> RedDBResult<RuntimeQueryResult> {
1348        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1349        let store = self.inner.db.store();
1350
1351        // Verify the table exists
1352        let manager = store
1353            .get_collection(&query.table)
1354            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1355
1356        let method_kind = match query.method {
1357            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1358            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1359            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1360            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1361        };
1362
1363        // Extract fields from existing entities for indexing. Row
1364        // entities may arrive in either the "named" HashMap layout
1365        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1366        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1367        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1368        // leaves `named == None`). Prior to this commit the columnar
1369        // branch returned an empty field list, so `CREATE INDEX` built
1370        // a zero-entity index over HTTP-inserted data even though the
1371        // data was queryable via `SELECT`.
1372        let entities = manager.query_all(|_| true);
1373        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1374            entities
1375                .iter()
1376                .map(|e| {
1377                    let fields = match &e.data {
1378                        crate::storage::EntityData::Row(row) => {
1379                            if let Some(ref named) = row.named {
1380                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1381                            } else if let Some(ref schema) = row.schema {
1382                                // Columnar layout — pair each column
1383                                // with its positional name from the
1384                                // shared schema Arc.
1385                                schema
1386                                    .iter()
1387                                    .zip(row.columns.iter())
1388                                    .map(|(k, v)| (k.clone(), v.clone()))
1389                                    .collect()
1390                            } else {
1391                                Vec::new()
1392                            }
1393                        }
1394                        crate::storage::EntityData::Node(node) => node
1395                            .properties
1396                            .iter()
1397                            .map(|(k, v)| (k.clone(), v.clone()))
1398                            .collect(),
1399                        _ => Vec::new(),
1400                    };
1401                    (e.id, fields)
1402                })
1403                .collect();
1404
1405        // Build the index
1406        let indexed_count = self
1407            .inner
1408            .index_store
1409            .create_index(
1410                &query.name,
1411                &query.table,
1412                &query.columns,
1413                method_kind,
1414                query.unique,
1415                &entity_fields,
1416            )
1417            .map_err(RedDBError::Internal)?;
1418
1419        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1420            &query.table,
1421            &entity_fields,
1422        );
1423        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1424        self.invalidate_plan_cache();
1425
1426        // Register metadata
1427        self.inner
1428            .index_store
1429            .register(super::index_store::RegisteredIndex {
1430                name: query.name.clone(),
1431                collection: query.table.clone(),
1432                columns: query.columns.clone(),
1433                method: method_kind,
1434                unique: query.unique,
1435            });
1436        // Issue #120 — surface the index name + indexed columns in
1437        // the schema-vocabulary so AskPipeline (#121) can resolve
1438        // "the email index" back to its collection.
1439        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1440            collection: query.table.clone(),
1441            index: query.name.clone(),
1442            columns: query.columns.clone(),
1443        });
1444
1445        let method_str = format!("{}", query.method);
1446        let unique_str = if query.unique { "unique " } else { "" };
1447        let cols = query.columns.join(", ");
1448
1449        Ok(RuntimeQueryResult::ok_message(
1450            raw_query.to_string(),
1451            &format!(
1452                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1453                unique_str, query.name, query.table, cols, method_str, indexed_count
1454            ),
1455            "create",
1456        ))
1457    }
1458
1459    /// Execute DROP INDEX
1460    ///
1461    /// Removes an index from a collection.
1462    pub fn execute_drop_index(
1463        &self,
1464        raw_query: &str,
1465        query: &DropIndexQuery,
1466    ) -> RedDBResult<RuntimeQueryResult> {
1467        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1468        let store = self.inner.db.store();
1469
1470        // Verify the table exists
1471        if store.get_collection(&query.table).is_none() {
1472            if query.if_exists {
1473                return Ok(RuntimeQueryResult::ok_message(
1474                    raw_query.to_string(),
1475                    &format!("table '{}' does not exist", query.table),
1476                    "drop",
1477                ));
1478            }
1479            return Err(RedDBError::NotFound(format!(
1480                "table '{}' not found",
1481                query.table
1482            )));
1483        }
1484
1485        // Remove from IndexStore
1486        self.inner.index_store.drop_index(&query.name, &query.table);
1487        self.invalidate_plan_cache();
1488        // Issue #120 — keep the schema-vocabulary index entry in sync.
1489        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1490            collection: query.table.clone(),
1491            index: query.name.clone(),
1492        });
1493
1494        Ok(RuntimeQueryResult::ok_message(
1495            raw_query.to_string(),
1496            &format!("index '{}' dropped from '{}'", query.name, query.table),
1497            "drop",
1498        ))
1499    }
1500
1501    fn execute_drop_typed_collection(
1502        &self,
1503        raw_query: &str,
1504        name: &str,
1505        if_exists: bool,
1506        expected_model: CollectionModel,
1507        label: &str,
1508    ) -> RedDBResult<RuntimeQueryResult> {
1509        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1510        if is_system_schema_name(name) {
1511            return Err(RedDBError::Query("system schema is read-only".to_string()));
1512        }
1513        let store = self.inner.db.store();
1514        if store.get_collection(name).is_none() {
1515            if if_exists {
1516                return Ok(RuntimeQueryResult::ok_message(
1517                    raw_query.to_string(),
1518                    &format!("{label} '{name}' does not exist"),
1519                    "drop",
1520                ));
1521            }
1522            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1523        }
1524
1525        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1526        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1527        self.drop_collection_storage(raw_query, name, label)
1528    }
1529
1530    pub fn execute_truncate(
1531        &self,
1532        raw_query: &str,
1533        query: &TruncateQuery,
1534    ) -> RedDBResult<RuntimeQueryResult> {
1535        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1536        if is_system_schema_name(&query.name) {
1537            return Err(RedDBError::Query("system schema is read-only".to_string()));
1538        }
1539
1540        let label = query
1541            .model
1542            .map(polymorphic_resolver::model_name)
1543            .unwrap_or("collection");
1544        let store = self.inner.db.store();
1545        if store.get_collection(&query.name).is_none() {
1546            if query.if_exists {
1547                return Ok(RuntimeQueryResult::ok_message(
1548                    raw_query.to_string(),
1549                    &format!("{label} '{}' does not exist", query.name),
1550                    "truncate",
1551                ));
1552            }
1553            return Err(RedDBError::NotFound(format!(
1554                "{label} '{}' not found",
1555                query.name
1556            )));
1557        }
1558
1559        let actual =
1560            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1561        if let Some(expected) = query.model {
1562            polymorphic_resolver::ensure_model_match(expected, actual)?;
1563        }
1564
1565        if actual == CollectionModel::Queue {
1566            return self.execute_queue_command(
1567                raw_query,
1568                &QueueCommand::Purge {
1569                    queue: query.name.clone(),
1570                },
1571            );
1572        }
1573
1574        // Count before wiping so we can emit the aggregated truncate event.
1575        let affected = self.truncate_collection_entities(&query.name)?;
1576        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1577        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1578        self.inner.db.invalidate_vector_index(&query.name);
1579        self.clear_table_planner_stats(&query.name);
1580        self.invalidate_result_cache();
1581
1582        Ok(RuntimeQueryResult::ok_message(
1583            raw_query.to_string(),
1584            &format!(
1585                "{affected} entities truncated from {label} '{}'",
1586                query.name
1587            ),
1588            "truncate",
1589        ))
1590    }
1591
1592    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1593        let store = self.inner.db.store();
1594        let Some(manager) = store.get_collection(name) else {
1595            return Ok(0);
1596        };
1597        let entities = manager.query_all(|_| true);
1598        if entities.is_empty() {
1599            return Ok(0);
1600        }
1601
1602        for entity in &entities {
1603            let fields = entity_index_fields(&entity.data);
1604            self.inner
1605                .index_store
1606                .index_entity_delete(name, entity.id, &fields)
1607                .map_err(RedDBError::Internal)?;
1608        }
1609
1610        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1611        let deleted_ids = store
1612            .delete_batch(name, &ids)
1613            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1614        for id in &deleted_ids {
1615            store.context_index().remove_entity(*id);
1616        }
1617        Ok(deleted_ids.len() as u64)
1618    }
1619
1620    fn drop_collection_storage(
1621        &self,
1622        raw_query: &str,
1623        name: &str,
1624        label: &str,
1625    ) -> RedDBResult<RuntimeQueryResult> {
1626        let store = self.inner.db.store();
1627
1628        // Emit 1 collection_dropped event before storage is wiped.
1629        // Queue is preserved; subscription is removed with the contract below.
1630        let final_count = store
1631            .get_collection(name)
1632            .map(|manager| manager.query_all(|_| true).len() as u64)
1633            .unwrap_or(0);
1634        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1635            self,
1636            name,
1637            final_count,
1638        )?;
1639
1640        let orphaned_indices: Vec<String> = self
1641            .inner
1642            .index_store
1643            .list_indices(name)
1644            .into_iter()
1645            .map(|index| index.name)
1646            .collect();
1647        for index_name in &orphaned_indices {
1648            self.inner.index_store.drop_index(index_name, name);
1649        }
1650
1651        store
1652            .drop_collection(name)
1653            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1654        self.inner.db.invalidate_vector_index(name);
1655        self.inner.db.clear_collection_default_ttl_ms(name);
1656        self.inner
1657            .db
1658            .remove_collection_contract(name)
1659            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1660        self.clear_table_planner_stats(name);
1661        self.invalidate_result_cache();
1662        if let Some(store) = self.inner.auth_store.read().clone() {
1663            store.invalidate_visible_collections_cache();
1664        }
1665        self.inner
1666            .db
1667            .persist_metadata()
1668            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1669        self.schema_vocabulary_apply(
1670            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1671                collection: name.to_string(),
1672            },
1673        );
1674
1675        Ok(RuntimeQueryResult::ok_message(
1676            raw_query.to_string(),
1677            &format!("{label} '{name}' dropped"),
1678            "drop",
1679        ))
1680    }
1681}
1682
1683pub(crate) fn is_system_schema_name(name: &str) -> bool {
1684    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1685}
1686
1687fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1688    match data {
1689        EntityData::Row(row) => {
1690            if let Some(ref named) = row.named {
1691                named
1692                    .iter()
1693                    .map(|(key, value)| (key.clone(), value.clone()))
1694                    .collect()
1695            } else if let Some(ref schema) = row.schema {
1696                schema
1697                    .iter()
1698                    .zip(row.columns.iter())
1699                    .map(|(key, value)| (key.clone(), value.clone()))
1700                    .collect()
1701            } else {
1702                Vec::new()
1703            }
1704        }
1705        EntityData::Node(node) => node
1706            .properties
1707            .iter()
1708            .map(|(key, value)| (key.clone(), value.clone()))
1709            .collect(),
1710        _ => Vec::new(),
1711    }
1712}
1713
1714fn collection_contract_from_create_table(
1715    query: &CreateTableQuery,
1716) -> RedDBResult<crate::physical::CollectionContract> {
1717    let now = current_unix_ms();
1718    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1719        .columns
1720        .iter()
1721        .map(declared_column_contract_from_ddl)
1722        .collect();
1723    if query.timestamps {
1724        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1725        // columns that the write path populates from
1726        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1727        declared_columns.push(crate::physical::DeclaredColumnContract {
1728            name: "created_at".to_string(),
1729            data_type: "BIGINT".to_string(),
1730            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1731            not_null: true,
1732            default: None,
1733            compress: None,
1734            unique: false,
1735            primary_key: false,
1736            enum_variants: Vec::new(),
1737            array_element: None,
1738            decimal_precision: None,
1739        });
1740        declared_columns.push(crate::physical::DeclaredColumnContract {
1741            name: "updated_at".to_string(),
1742            data_type: "BIGINT".to_string(),
1743            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1744            not_null: true,
1745            default: None,
1746            compress: None,
1747            unique: false,
1748            primary_key: false,
1749            enum_variants: Vec::new(),
1750            array_element: None,
1751            decimal_precision: None,
1752        });
1753    }
1754    Ok(crate::physical::CollectionContract {
1755        name: query.name.clone(),
1756        declared_model: crate::catalog::CollectionModel::Table,
1757        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1758        origin: crate::physical::ContractOrigin::Explicit,
1759        version: 1,
1760        created_at_unix_ms: now,
1761        updated_at_unix_ms: now,
1762        default_ttl_ms: query.default_ttl_ms,
1763        vector_dimension: None,
1764        vector_metric: None,
1765        context_index_fields: query.context_index_fields.clone(),
1766        declared_columns,
1767        table_def: Some(build_table_def_from_create_table(query)?),
1768        timestamps_enabled: query.timestamps,
1769        context_index_enabled: query.context_index_enabled
1770            || !query.context_index_fields.is_empty(),
1771        metrics_raw_retention_ms: None,
1772        metrics_rollup_policies: Vec::new(),
1773        metrics_tenant_identity: None,
1774        metrics_namespace: None,
1775        append_only: query.append_only,
1776        subscriptions: query.subscriptions.clone(),
1777        analytics_config: Vec::new(),
1778        session_key: None,
1779        session_gap_ms: None,
1780        retention_duration_ms: None,
1781    })
1782}
1783
1784fn default_collection_contract_for_existing_table(
1785    name: &str,
1786) -> crate::physical::CollectionContract {
1787    let now = current_unix_ms();
1788    crate::physical::CollectionContract {
1789        name: name.to_string(),
1790        declared_model: crate::catalog::CollectionModel::Table,
1791        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1792        origin: crate::physical::ContractOrigin::Explicit,
1793        version: 0,
1794        created_at_unix_ms: now,
1795        updated_at_unix_ms: now,
1796        default_ttl_ms: None,
1797        vector_dimension: None,
1798        vector_metric: None,
1799        context_index_fields: Vec::new(),
1800        declared_columns: Vec::new(),
1801        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1802        timestamps_enabled: false,
1803        context_index_enabled: false,
1804        metrics_raw_retention_ms: None,
1805        metrics_rollup_policies: Vec::new(),
1806        metrics_tenant_identity: None,
1807        metrics_namespace: None,
1808        append_only: false,
1809        subscriptions: Vec::new(),
1810        analytics_config: Vec::new(),
1811        session_key: None,
1812        session_gap_ms: None,
1813        retention_duration_ms: None,
1814    }
1815}
1816
1817fn keyed_collection_contract(
1818    name: &str,
1819    model: crate::catalog::CollectionModel,
1820    analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1821) -> crate::physical::CollectionContract {
1822    let now = current_unix_ms();
1823    crate::physical::CollectionContract {
1824        name: name.to_string(),
1825        declared_model: model,
1826        schema_mode: crate::catalog::SchemaMode::Dynamic,
1827        origin: crate::physical::ContractOrigin::Explicit,
1828        version: 1,
1829        created_at_unix_ms: now,
1830        updated_at_unix_ms: now,
1831        default_ttl_ms: None,
1832        vector_dimension: None,
1833        vector_metric: None,
1834        context_index_fields: Vec::new(),
1835        declared_columns: Vec::new(),
1836        table_def: None,
1837        timestamps_enabled: false,
1838        context_index_enabled: false,
1839        metrics_raw_retention_ms: None,
1840        metrics_rollup_policies: Vec::new(),
1841        metrics_tenant_identity: None,
1842        metrics_namespace: None,
1843        append_only: false,
1844        subscriptions: Vec::new(),
1845        analytics_config,
1846        session_key: None,
1847        session_gap_ms: None,
1848        retention_duration_ms: None,
1849    }
1850}
1851
1852fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1853    let now = current_unix_ms();
1854    crate::physical::CollectionContract {
1855        name: query.name.clone(),
1856        declared_model: crate::catalog::CollectionModel::Metrics,
1857        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1858        origin: crate::physical::ContractOrigin::Explicit,
1859        version: 1,
1860        created_at_unix_ms: now,
1861        updated_at_unix_ms: now,
1862        default_ttl_ms: query.default_ttl_ms,
1863        vector_dimension: None,
1864        vector_metric: None,
1865        context_index_fields: Vec::new(),
1866        declared_columns: Vec::new(),
1867        table_def: None,
1868        timestamps_enabled: false,
1869        context_index_enabled: false,
1870        metrics_raw_retention_ms: query.default_ttl_ms,
1871        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1872        metrics_tenant_identity: Some(
1873            query
1874                .tenant_by
1875                .clone()
1876                .unwrap_or_else(|| "current_tenant".to_string()),
1877        ),
1878        metrics_namespace: Some("default".to_string()),
1879        append_only: true,
1880        subscriptions: Vec::new(),
1881        analytics_config: Vec::new(),
1882        session_key: None,
1883        session_gap_ms: None,
1884        retention_duration_ms: None,
1885    }
1886}
1887
1888fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1889    let now = current_unix_ms();
1890    crate::physical::CollectionContract {
1891        name: query.name.clone(),
1892        declared_model: crate::catalog::CollectionModel::Vector,
1893        schema_mode: crate::catalog::SchemaMode::Dynamic,
1894        origin: crate::physical::ContractOrigin::Explicit,
1895        version: 1,
1896        created_at_unix_ms: now,
1897        updated_at_unix_ms: now,
1898        default_ttl_ms: None,
1899        vector_dimension: Some(query.dimension),
1900        vector_metric: Some(query.metric),
1901        context_index_fields: Vec::new(),
1902        declared_columns: Vec::new(),
1903        table_def: None,
1904        timestamps_enabled: false,
1905        context_index_enabled: false,
1906        metrics_raw_retention_ms: None,
1907        metrics_rollup_policies: Vec::new(),
1908        metrics_tenant_identity: None,
1909        metrics_namespace: None,
1910        append_only: false,
1911        subscriptions: Vec::new(),
1912        analytics_config: Vec::new(),
1913        session_key: None,
1914        session_gap_ms: None,
1915        retention_duration_ms: None,
1916    }
1917}
1918
1919fn declared_column_contract_from_ddl(
1920    column: &CreateColumnDef,
1921) -> crate::physical::DeclaredColumnContract {
1922    crate::physical::DeclaredColumnContract {
1923        name: column.name.clone(),
1924        data_type: column.data_type.clone(),
1925        sql_type: Some(column.sql_type.clone()),
1926        not_null: column.not_null,
1927        default: column.default.clone(),
1928        compress: column.compress,
1929        unique: column.unique,
1930        primary_key: column.primary_key,
1931        enum_variants: column.enum_variants.clone(),
1932        array_element: column.array_element.clone(),
1933        decimal_precision: column.decimal_precision,
1934    }
1935}
1936
1937fn apply_alter_operations_to_contract(
1938    contract: &mut crate::physical::CollectionContract,
1939    operations: &[AlterOperation],
1940) {
1941    if contract.table_def.is_none() {
1942        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1943    }
1944    for operation in operations {
1945        match operation {
1946            AlterOperation::AddColumn(column) => {
1947                if !contract
1948                    .declared_columns
1949                    .iter()
1950                    .any(|existing| existing.name == column.name)
1951                {
1952                    contract
1953                        .declared_columns
1954                        .push(declared_column_contract_from_ddl(column));
1955                }
1956                if let Some(table_def) = contract.table_def.as_mut() {
1957                    if table_def.get_column(&column.name).is_none() {
1958                        if let Ok(column_def) = column_def_from_ddl(column) {
1959                            if column.primary_key {
1960                                table_def.primary_key.push(column.name.clone());
1961                                table_def.constraints.push(
1962                                    crate::storage::schema::Constraint::new(
1963                                        format!("pk_{}", column.name),
1964                                        crate::storage::schema::ConstraintType::PrimaryKey,
1965                                    )
1966                                    .on_columns(vec![column.name.clone()]),
1967                                );
1968                            }
1969                            if column.unique {
1970                                table_def.constraints.push(
1971                                    crate::storage::schema::Constraint::new(
1972                                        format!("uniq_{}", column.name),
1973                                        crate::storage::schema::ConstraintType::Unique,
1974                                    )
1975                                    .on_columns(vec![column.name.clone()]),
1976                                );
1977                            }
1978                            if column.not_null {
1979                                table_def.constraints.push(
1980                                    crate::storage::schema::Constraint::new(
1981                                        format!("not_null_{}", column.name),
1982                                        crate::storage::schema::ConstraintType::NotNull,
1983                                    )
1984                                    .on_columns(vec![column.name.clone()]),
1985                                );
1986                            }
1987                            table_def.columns.push(column_def);
1988                        }
1989                    }
1990                }
1991            }
1992            AlterOperation::DropColumn(name) => {
1993                contract
1994                    .declared_columns
1995                    .retain(|column| column.name != *name);
1996                if let Some(table_def) = contract.table_def.as_mut() {
1997                    if let Some(index) = table_def.column_index(name) {
1998                        table_def.columns.remove(index);
1999                    }
2000                    table_def.primary_key.retain(|column| column != name);
2001                    table_def.constraints.retain(|constraint| {
2002                        !constraint.columns.iter().any(|column| column == name)
2003                    });
2004                    table_def
2005                        .indexes
2006                        .retain(|index| !index.columns.iter().any(|column| column == name));
2007                }
2008            }
2009            AlterOperation::RenameColumn { from, to } => {
2010                if contract
2011                    .declared_columns
2012                    .iter()
2013                    .any(|column| column.name == *to)
2014                {
2015                    continue;
2016                }
2017                if let Some(column) = contract
2018                    .declared_columns
2019                    .iter_mut()
2020                    .find(|column| column.name == *from)
2021                {
2022                    column.name = to.clone();
2023                }
2024                if let Some(table_def) = contract.table_def.as_mut() {
2025                    if let Some(column) = table_def
2026                        .columns
2027                        .iter_mut()
2028                        .find(|column| column.name == *from)
2029                    {
2030                        column.name = to.clone();
2031                    }
2032                    for primary_key in &mut table_def.primary_key {
2033                        if *primary_key == *from {
2034                            *primary_key = to.clone();
2035                        }
2036                    }
2037                    for constraint in &mut table_def.constraints {
2038                        for column in &mut constraint.columns {
2039                            if *column == *from {
2040                                *column = to.clone();
2041                            }
2042                        }
2043                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2044                            for column in ref_columns {
2045                                if *column == *from {
2046                                    *column = to.clone();
2047                                }
2048                            }
2049                        }
2050                    }
2051                    for index in &mut table_def.indexes {
2052                        for column in &mut index.columns {
2053                            if *column == *from {
2054                                *column = to.clone();
2055                            }
2056                        }
2057                    }
2058                }
2059            }
2060            // Partition ops don't touch the column contract — metadata is
2061            // persisted separately via `red_config.partition.*`.
2062            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2063            // RLS toggles don't touch the column contract — flag is persisted
2064            // separately via `red_config.rls.enabled.{table}` and enforced
2065            // through the in-memory `rls_enabled_tables` set.
2066            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2067            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
2068            // and are enforced through `tenant_tables` + RLS auto-policy.
2069            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2070            AlterOperation::SetAppendOnly(on) => {
2071                contract.append_only = *on;
2072            }
2073            // VCS opt-in is persisted to red_vcs_settings by the
2074            // executor, not the contract — nothing to do here.
2075            AlterOperation::SetVersioned(_) => {}
2076            AlterOperation::EnableEvents(subscription) => {
2077                let mut subscription = subscription.clone();
2078                subscription.source = contract.name.clone();
2079                subscription.enabled = true;
2080                if let Some(existing) = contract
2081                    .subscriptions
2082                    .iter_mut()
2083                    .find(|existing| existing.target_queue == subscription.target_queue)
2084                {
2085                    *existing = subscription;
2086                } else {
2087                    contract.subscriptions.push(subscription);
2088                }
2089            }
2090            AlterOperation::DisableEvents => {
2091                for subscription in &mut contract.subscriptions {
2092                    subscription.enabled = false;
2093                }
2094            }
2095            AlterOperation::AddSubscription { name, descriptor } => {
2096                let mut sub = descriptor.clone();
2097                sub.name = name.clone();
2098                sub.source = contract.name.clone();
2099                sub.enabled = true;
2100                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2101                {
2102                    *existing = sub;
2103                } else {
2104                    contract.subscriptions.push(sub);
2105                }
2106            }
2107            AlterOperation::DropSubscription { name } => {
2108                contract.subscriptions.retain(|s| s.name != *name);
2109            }
2110            // Signer registry mutations live in `red_config` outside the
2111            // contract surface — the executor applied them directly via
2112            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
2113            // into the column-shaped contract.
2114            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2115            AlterOperation::SetRetention { duration_ms } => {
2116                contract.retention_duration_ms = Some(*duration_ms);
2117            }
2118            AlterOperation::UnsetRetention => {
2119                contract.retention_duration_ms = None;
2120            }
2121            // Issue #801 — fold analytics lifecycle into the WAL-backed
2122            // `analytics_config` so the change is durable and the
2123            // `<graph>.<output>` resolver picks it up on the next read.
2124            AlterOperation::AddAnalytics(views) => {
2125                for view in views {
2126                    // Idempotent: skip outputs already enabled so a repeated
2127                    // ADD never duplicates state. The first declaration wins —
2128                    // re-declaring options requires DROP then ADD.
2129                    if !contract
2130                        .analytics_config
2131                        .iter()
2132                        .any(|existing| existing.output == view.output)
2133                    {
2134                        contract.analytics_config.push(view.clone());
2135                    }
2136                }
2137            }
2138            AlterOperation::DropAnalytics(output) => {
2139                contract
2140                    .analytics_config
2141                    .retain(|view| view.output != *output);
2142            }
2143        }
2144    }
2145}
2146
2147/// Issue #580 — returns true if the contract carries at least one
2148/// column the retention filter can use as a timestamp anchor:
2149/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
2150/// or a user-declared column with a temporal data_type.
2151pub(crate) fn retention_timestamp_column_exists(
2152    contract: &crate::physical::CollectionContract,
2153) -> bool {
2154    if contract.timestamps_enabled {
2155        return true;
2156    }
2157    if matches!(
2158        contract.declared_model,
2159        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2160    ) {
2161        // Time-series and metrics collections carry an intrinsic
2162        // timestamp axis on every row even without a declared column;
2163        // retention has a natural anchor.
2164        return true;
2165    }
2166    contract
2167        .declared_columns
2168        .iter()
2169        .any(|column| is_temporal_data_type(&column.data_type))
2170}
2171
2172fn is_temporal_data_type(data_type: &str) -> bool {
2173    let upper = data_type.to_ascii_uppercase();
2174    matches!(
2175        upper.as_str(),
2176        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2177    )
2178}
2179
2180fn validate_event_subscriptions(
2181    runtime: &RedDBRuntime,
2182    source: &str,
2183    subscriptions: &[crate::catalog::SubscriptionDescriptor],
2184) -> RedDBResult<()> {
2185    for subscription in subscriptions
2186        .iter()
2187        .filter(|subscription| subscription.enabled)
2188    {
2189        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2190            return Err(RedDBError::Query(
2191                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2192            ));
2193        }
2194        validate_subscription_auth(runtime, source, subscription)?;
2195        if subscription.target_queue == source
2196            || subscription_would_create_cycle(
2197                &runtime.inner.db,
2198                source,
2199                &subscription.target_queue,
2200            )
2201        {
2202            return Err(RedDBError::Query(
2203                "subscription would create cycle".to_string(),
2204            ));
2205        }
2206        audit_subscription_redact_gap(runtime, source, subscription);
2207    }
2208    Ok(())
2209}
2210
2211fn validate_subscription_auth(
2212    runtime: &RedDBRuntime,
2213    source: &str,
2214    subscription: &crate::catalog::SubscriptionDescriptor,
2215) -> RedDBResult<()> {
2216    let auth_store = match runtime.inner.auth_store.read().clone() {
2217        Some(store) => store,
2218        None => return Ok(()),
2219    };
2220    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2221        Some(identity) => identity,
2222        None => return Ok(()),
2223    };
2224    let tenant = crate::runtime::impl_core::current_tenant();
2225    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2226
2227    if auth_store.iam_authorization_enabled() {
2228        let ctx = crate::auth::policies::EvalContext {
2229            principal_tenant: tenant.clone(),
2230            current_tenant: tenant.clone(),
2231            peer_ip: None,
2232            mfa_present: false,
2233            now_ms: crate::auth::now_ms(),
2234            principal_is_admin_role: role == crate::auth::Role::Admin,
2235            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2236            principal_is_platform_scoped: principal.tenant.is_none(),
2237        };
2238        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2239        if let Some(t) = tenant.as_deref() {
2240            source_resource = source_resource.with_tenant(t.to_string());
2241        }
2242        if !auth_store.check_policy_authz_with_role(
2243            &principal,
2244            "select",
2245            &source_resource,
2246            &ctx,
2247            role,
2248        ) {
2249            return Err(RedDBError::Query(format!(
2250                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2251                principal, source_resource.kind, source_resource.name
2252            )));
2253        }
2254
2255        let mut target_resource =
2256            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2257        if let Some(t) = tenant.as_deref() {
2258            target_resource = target_resource.with_tenant(t.to_string());
2259        }
2260        if !auth_store.check_policy_authz_with_role(
2261            &principal,
2262            "write",
2263            &target_resource,
2264            &ctx,
2265            role,
2266        ) {
2267            return Err(RedDBError::Query(format!(
2268                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2269                principal, target_resource.kind, target_resource.name
2270            )));
2271        }
2272        return Ok(());
2273    }
2274
2275    let ctx = crate::auth::privileges::AuthzContext {
2276        principal: &username,
2277        effective_role: role,
2278        tenant: tenant.as_deref(),
2279    };
2280    auth_store
2281        .check_grant(
2282            &ctx,
2283            crate::auth::privileges::Action::Select,
2284            &crate::auth::privileges::Resource::table_from_name(source),
2285        )
2286        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2287    auth_store
2288        .check_grant(
2289            &ctx,
2290            crate::auth::privileges::Action::Insert,
2291            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2292        )
2293        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2294    Ok(())
2295}
2296
2297fn audit_subscription_redact_gap(
2298    runtime: &RedDBRuntime,
2299    source: &str,
2300    subscription: &crate::catalog::SubscriptionDescriptor,
2301) {
2302    let auth_store = match runtime.inner.auth_store.read().clone() {
2303        Some(store) if store.iam_authorization_enabled() => store,
2304        _ => return,
2305    };
2306    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2307        Some(identity) => identity,
2308        None => return,
2309    };
2310    let tenant = crate::runtime::impl_core::current_tenant();
2311    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2312    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2313    if missing.is_empty() {
2314        return;
2315    }
2316
2317    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2318    tracing::warn!(
2319        target: "reddb::operator",
2320        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2321        source,
2322        subscription.target_queue,
2323        columns
2324    );
2325    let mut event = AuditEvent::builder("subscription_redact_gap")
2326        .principal(username)
2327        .source(AuditAuthSource::System)
2328        .resource(format!(
2329            "subscription:{}->{}",
2330            source, subscription.target_queue
2331        ))
2332        .outcome(Outcome::Success)
2333        .field(AuditFieldEscaper::field("source", source))
2334        .field(AuditFieldEscaper::field(
2335            "target_queue",
2336            subscription.target_queue.clone(),
2337        ))
2338        .field(AuditFieldEscaper::field(
2339            "subscription",
2340            subscription.name.clone(),
2341        ))
2342        .field(AuditFieldEscaper::field("columns", columns))
2343        .field(AuditFieldEscaper::field("role", role.as_str()));
2344    if let Some(t) = tenant {
2345        event = event.tenant(t);
2346    }
2347    runtime.inner.audit_log.record_event(event.build());
2348}
2349
2350fn subscription_redact_gap_columns(
2351    auth_store: &crate::auth::store::AuthStore,
2352    principal: &crate::auth::UserId,
2353    source: &str,
2354    subscription: &crate::catalog::SubscriptionDescriptor,
2355) -> BTreeSet<String> {
2356    let redacted: HashSet<String> = subscription
2357        .redact_fields
2358        .iter()
2359        .map(|field| field.to_ascii_lowercase())
2360        .collect();
2361    auth_store
2362        .effective_policies(principal)
2363        .iter()
2364        .flat_map(|policy| policy.statements.iter())
2365        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2366        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2367        .flat_map(|statement| statement.resources.iter())
2368        .filter_map(|resource| denied_column_for_source(resource, source))
2369        .filter(|column| !redact_covers_column(&redacted, source, column))
2370        .collect()
2371}
2372
2373fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2374    match pattern {
2375        crate::auth::policies::ActionPattern::Wildcard => true,
2376        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2377        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2378            "select".len() > prefix.len() + 1
2379                && "select".starts_with(prefix)
2380                && "select".as_bytes()[prefix.len()] == b':'
2381        }
2382    }
2383}
2384
2385fn denied_column_for_source(
2386    resource: &crate::auth::policies::ResourcePattern,
2387    source: &str,
2388) -> Option<String> {
2389    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2390        return None;
2391    };
2392    if kind != "column" {
2393        return None;
2394    }
2395    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2396    (column.table_resource_name() == source).then_some(column.column)
2397}
2398
2399fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2400    let column = column.to_ascii_lowercase();
2401    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2402    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2403}
2404
2405fn subscription_would_create_cycle(
2406    db: &crate::storage::unified::devx::RedDB,
2407    source: &str,
2408    target: &str,
2409) -> bool {
2410    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2411    for contract in db.collection_contracts() {
2412        for subscription in contract
2413            .subscriptions
2414            .into_iter()
2415            .filter(|subscription| subscription.enabled)
2416        {
2417            graph
2418                .entry(subscription.source)
2419                .or_default()
2420                .push(subscription.target_queue);
2421        }
2422    }
2423    graph
2424        .entry(source.to_string())
2425        .or_default()
2426        .push(target.to_string());
2427
2428    let mut stack = vec![target.to_string()];
2429    let mut seen = HashSet::new();
2430    while let Some(node) = stack.pop() {
2431        if node == source {
2432            return true;
2433        }
2434        if !seen.insert(node.clone()) {
2435            continue;
2436        }
2437        if let Some(next) = graph.get(&node) {
2438            stack.extend(next.iter().cloned());
2439        }
2440    }
2441    false
2442}
2443
2444pub(crate) fn ensure_event_target_queue_pub(
2445    runtime: &RedDBRuntime,
2446    queue: &str,
2447) -> RedDBResult<()> {
2448    ensure_event_target_queue(runtime, queue)
2449}
2450
2451fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2452    let store = runtime.inner.db.store();
2453    if store.get_collection(queue).is_some() {
2454        return Ok(());
2455    }
2456    store
2457        .create_collection(queue)
2458        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2459    runtime
2460        .inner
2461        .db
2462        .save_collection_contract(event_queue_collection_contract(queue))
2463        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2464    store.set_config_tree(
2465        &format!("queue.{queue}.mode"),
2466        &crate::serde_json::Value::String("fanout".to_string()),
2467    );
2468    Ok(())
2469}
2470
2471fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2472    let now = current_unix_ms();
2473    crate::physical::CollectionContract {
2474        name: queue.to_string(),
2475        declared_model: crate::catalog::CollectionModel::Queue,
2476        schema_mode: crate::catalog::SchemaMode::Dynamic,
2477        origin: crate::physical::ContractOrigin::Implicit,
2478        version: 1,
2479        created_at_unix_ms: now,
2480        updated_at_unix_ms: now,
2481        default_ttl_ms: None,
2482        vector_dimension: None,
2483        vector_metric: None,
2484        context_index_fields: Vec::new(),
2485        declared_columns: Vec::new(),
2486        table_def: None,
2487        timestamps_enabled: false,
2488        context_index_enabled: false,
2489        metrics_raw_retention_ms: None,
2490        metrics_rollup_policies: Vec::new(),
2491        metrics_tenant_identity: None,
2492        metrics_namespace: None,
2493        append_only: true,
2494        subscriptions: Vec::new(),
2495        analytics_config: Vec::new(),
2496        session_key: None,
2497        session_gap_ms: None,
2498        retention_duration_ms: None,
2499    }
2500}
2501
2502fn build_table_def_from_create_table(
2503    query: &CreateTableQuery,
2504) -> RedDBResult<crate::storage::schema::TableDef> {
2505    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2506    for column in &query.columns {
2507        if column.primary_key {
2508            table.primary_key.push(column.name.clone());
2509            table.constraints.push(
2510                crate::storage::schema::Constraint::new(
2511                    format!("pk_{}", column.name),
2512                    crate::storage::schema::ConstraintType::PrimaryKey,
2513                )
2514                .on_columns(vec![column.name.clone()]),
2515            );
2516        }
2517        if column.unique {
2518            table.constraints.push(
2519                crate::storage::schema::Constraint::new(
2520                    format!("uniq_{}", column.name),
2521                    crate::storage::schema::ConstraintType::Unique,
2522                )
2523                .on_columns(vec![column.name.clone()]),
2524            );
2525        }
2526        if column.not_null {
2527            table.constraints.push(
2528                crate::storage::schema::Constraint::new(
2529                    format!("not_null_{}", column.name),
2530                    crate::storage::schema::ConstraintType::NotNull,
2531                )
2532                .on_columns(vec![column.name.clone()]),
2533            );
2534        }
2535        table.columns.push(column_def_from_ddl(column)?);
2536    }
2537    // WITH timestamps = true: append the two runtime-managed columns
2538    // to the schema so resolved_contract_columns exposes them to the
2539    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2540    // not-nullable; the write path auto-fills them.
2541    if query.timestamps {
2542        table.columns.push(
2543            crate::storage::schema::ColumnDef::new(
2544                "created_at".to_string(),
2545                crate::storage::schema::DataType::UnsignedInteger,
2546            )
2547            .not_null(),
2548        );
2549        table.columns.push(
2550            crate::storage::schema::ColumnDef::new(
2551                "updated_at".to_string(),
2552                crate::storage::schema::DataType::UnsignedInteger,
2553            )
2554            .not_null(),
2555        );
2556        table.constraints.push(
2557            crate::storage::schema::Constraint::new(
2558                "not_null_created_at".to_string(),
2559                crate::storage::schema::ConstraintType::NotNull,
2560            )
2561            .on_columns(vec!["created_at".to_string()]),
2562        );
2563        table.constraints.push(
2564            crate::storage::schema::Constraint::new(
2565                "not_null_updated_at".to_string(),
2566                crate::storage::schema::ConstraintType::NotNull,
2567            )
2568            .on_columns(vec!["updated_at".to_string()]),
2569        );
2570    }
2571    table
2572        .validate()
2573        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2574    Ok(table)
2575}
2576
2577fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2578    let data_type = resolve_declared_data_type(&column.data_type)
2579        .map_err(|err| RedDBError::Query(err.to_string()))?;
2580    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2581    if column.not_null {
2582        column_def = column_def.not_null();
2583    }
2584    if let Some(default) = &column.default {
2585        column_def = column_def.with_default(default.as_bytes().to_vec());
2586    }
2587    if column.compress.unwrap_or(0) > 0 {
2588        column_def = column_def.compressed();
2589    }
2590    if !column.enum_variants.is_empty() {
2591        column_def = column_def.with_variants(column.enum_variants.clone());
2592    }
2593    if let Some(precision) = column.decimal_precision {
2594        column_def = column_def.with_precision(precision);
2595    }
2596    if let Some(element_type) = &column.array_element {
2597        column_def = column_def.with_element_type(
2598            resolve_declared_data_type(element_type)
2599                .map_err(|err| RedDBError::Query(err.to_string()))?,
2600        );
2601    }
2602    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2603    if column.unique {
2604        column_def = column_def.with_metadata("unique", "true");
2605    }
2606    if column.primary_key {
2607        column_def = column_def.with_metadata("primary_key", "true");
2608    }
2609    Ok(column_def)
2610}
2611
2612fn current_unix_ms() -> u128 {
2613    std::time::SystemTime::now()
2614        .duration_since(std::time::UNIX_EPOCH)
2615        .unwrap_or_default()
2616        .as_millis()
2617}
2618
2619#[cfg(test)]
2620mod tests {
2621    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2622    use crate::auth::store::{AuthStore, PrincipalRef};
2623    use crate::auth::UserId;
2624    use crate::auth::{AuthConfig, Role};
2625    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2626    use crate::storage::schema::Value;
2627    use crate::{RedDBOptions, RedDBRuntime};
2628    use std::sync::Arc;
2629
2630    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2631        Policy {
2632            id: id.to_string(),
2633            version: 1,
2634            tenant: None,
2635            created_at: 0,
2636            updated_at: 0,
2637            statements: vec![Statement {
2638                sid: None,
2639                effect: Effect::Allow,
2640                actions: vec![ActionPattern::Exact(action.to_string())],
2641                resources: vec![ResourcePattern::Exact {
2642                    kind: "collection".to_string(),
2643                    name: collection.to_string(),
2644                }],
2645                condition: None,
2646            }],
2647        }
2648    }
2649
2650    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2651        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2652        *rt.inner.auth_store.write() = Some(store.clone());
2653        store
2654    }
2655
2656    #[test]
2657    fn drop_denied_without_iam_policy() {
2658        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2659        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2660        let store = wire_auth_store(&rt);
2661        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2662        let select_only = Policy {
2663            id: "select-only".to_string(),
2664            version: 1,
2665            tenant: None,
2666            created_at: 0,
2667            updated_at: 0,
2668            statements: vec![Statement {
2669                sid: None,
2670                effect: Effect::Allow,
2671                actions: vec![ActionPattern::Exact("select".to_string())],
2672                resources: vec![ResourcePattern::Wildcard],
2673                condition: None,
2674            }],
2675        };
2676        store.put_policy_internal(select_only).unwrap();
2677        let alice = UserId::from_parts(None, "alice");
2678        store
2679            .attach_policy(PrincipalRef::User(alice), "select-only")
2680            .unwrap();
2681        set_current_auth_identity("alice".to_string(), Role::Write);
2682        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2683        clear_current_auth_identity();
2684        assert!(
2685            format!("{err}").contains("denied by IAM policy"),
2686            "got: {err}"
2687        );
2688    }
2689
2690    #[test]
2691    fn drop_allowed_with_explicit_iam_policy() {
2692        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2693        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2694        let store = wire_auth_store(&rt);
2695        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2696        store.put_policy_internal(policy).unwrap();
2697        let bob = UserId::from_parts(None, "bob");
2698        store
2699            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2700            .unwrap();
2701        set_current_auth_identity("bob".to_string(), Role::Write);
2702        rt.execute_query("DROP TABLE bar").unwrap();
2703        clear_current_auth_identity();
2704    }
2705
2706    #[test]
2707    fn drop_allowed_with_wildcard_iam_policy() {
2708        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2709        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2710        let store = wire_auth_store(&rt);
2711        let policy = Policy {
2712            id: "allow-drop-all".to_string(),
2713            version: 1,
2714            tenant: None,
2715            created_at: 0,
2716            updated_at: 0,
2717            statements: vec![Statement {
2718                sid: None,
2719                effect: Effect::Allow,
2720                actions: vec![ActionPattern::Exact("drop".to_string())],
2721                resources: vec![ResourcePattern::Wildcard],
2722                condition: None,
2723            }],
2724        };
2725        store.put_policy_internal(policy).unwrap();
2726        let carl = UserId::from_parts(None, "carl");
2727        store
2728            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2729            .unwrap();
2730        set_current_auth_identity("carl".to_string(), Role::Write);
2731        rt.execute_query("DROP TABLE baz").unwrap();
2732        clear_current_auth_identity();
2733    }
2734
2735    #[test]
2736    fn truncate_denied_without_iam_policy() {
2737        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2738        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2739        let store = wire_auth_store(&rt);
2740        // Acceptance #2 (#712 / S5A): in `policy_only` mode a
2741        // principal with no matching policy is denied even if their
2742        // role would have permitted the action. The pre-#712 default
2743        // of "no policy → deny" only holds under `policy_only`, so
2744        // pin the mode explicitly here so the assertion holds
2745        // regardless of the construction-time default.
2746        store
2747            .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2748        // A policy exists (IAM active) but gives no truncate right.
2749        let select_only = Policy {
2750            id: "select-only-2".to_string(),
2751            version: 1,
2752            tenant: None,
2753            created_at: 0,
2754            updated_at: 0,
2755            statements: vec![Statement {
2756                sid: None,
2757                effect: Effect::Allow,
2758                actions: vec![ActionPattern::Exact("select".to_string())],
2759                resources: vec![ResourcePattern::Wildcard],
2760                condition: None,
2761            }],
2762        };
2763        store.put_policy_internal(select_only).unwrap();
2764        let dana = UserId::from_parts(None, "dana");
2765        store
2766            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2767            .unwrap();
2768        set_current_auth_identity("dana".to_string(), Role::Write);
2769        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2770        clear_current_auth_identity();
2771        assert!(
2772            format!("{err}").contains("denied by IAM policy"),
2773            "got: {err}"
2774        );
2775    }
2776
2777    #[test]
2778    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2779        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2780        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2781            .unwrap();
2782        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2783            .unwrap();
2784        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2785            .unwrap();
2786
2787        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2788        assert_eq!(truncated.statement_type, "truncate");
2789        assert_eq!(truncated.affected_rows, 0);
2790
2791        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2792        assert!(empty.result.records.is_empty());
2793
2794        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2795            .unwrap();
2796        let selected = rt
2797            .execute_query("SELECT name FROM users WHERE id = 3")
2798            .unwrap();
2799        let name = selected.result.records[0].get("name").unwrap();
2800        assert_eq!(name, &Value::text("cy"));
2801        assert!(rt.db().collection_contract("users").is_some());
2802        assert!(rt
2803            .inner
2804            .index_store
2805            .list_indices("users")
2806            .iter()
2807            .any(|index| index.name == "idx_users_id"));
2808    }
2809
2810    #[test]
2811    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2812        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2813        rt.execute_query("CREATE QUEUE tasks").unwrap();
2814        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2815
2816        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2817        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2818
2819        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2820        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2821        assert_eq!(
2822            len.result.records[0].get("len"),
2823            Some(&Value::UnsignedInteger(0))
2824        );
2825    }
2826
2827    #[test]
2828    fn truncate_system_schema_is_read_only() {
2829        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2830        let err = rt
2831            .execute_query("TRUNCATE COLLECTION red.collections")
2832            .unwrap_err();
2833        assert!(format!("{err}").contains("system schema is read-only"));
2834    }
2835
2836    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2837
2838    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2839        let result = rt
2840            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2841            .expect("peek queue");
2842        result
2843            .result
2844            .records
2845            .iter()
2846            .map(
2847                |record| match record.get("payload").expect("payload column") {
2848                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2849                    other => panic!("expected JSON queue payload, got {other:?}"),
2850                },
2851            )
2852            .collect()
2853    }
2854
2855    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2856    /// `truncate` event, not one delete event per row.
2857    #[test]
2858    fn truncate_event_enabled_table_emits_single_truncate_event() {
2859        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2860        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2861            .unwrap();
2862        rt.execute_query(
2863            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2864        )
2865        .unwrap();
2866
2867        // Drain the 3 insert events so we start clean.
2868        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2869
2870        rt.execute_query("TRUNCATE TABLE users").unwrap();
2871
2872        let events = queue_payloads(&rt, "users_events");
2873        // Must be exactly 1 truncate event, not 3 delete events.
2874        assert_eq!(
2875            events.len(),
2876            1,
2877            "expected 1 truncate event, got {}",
2878            events.len()
2879        );
2880        let ev = events[0].as_object().expect("event is object");
2881        assert_eq!(
2882            ev.get("op").and_then(crate::json::Value::as_str),
2883            Some("truncate")
2884        );
2885        assert_eq!(
2886            ev.get("collection").and_then(crate::json::Value::as_str),
2887            Some("users")
2888        );
2889        assert_eq!(
2890            ev.get("entities_count")
2891                .and_then(crate::json::Value::as_u64),
2892            Some(3)
2893        );
2894        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2895        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2896        assert!(ev
2897            .get("event_id")
2898            .and_then(crate::json::Value::as_str)
2899            .is_some_and(|s| !s.is_empty()));
2900    }
2901
2902    /// `TRUNCATE users` on a collection without event subscription emits no events.
2903    #[test]
2904    fn truncate_no_events_collection_emits_nothing() {
2905        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2906        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2907            .unwrap();
2908        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2909            .unwrap();
2910        // No EVENTS subscription — truncate must work without touching any queue.
2911        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2912        // No crash, no queue to check. Just verify truncation happened.
2913        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2914        assert!(rows.result.records.is_empty());
2915    }
2916
2917    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2918    /// `collection_dropped` event. The subscription is removed from the
2919    /// source contract but the target queue is preserved for consumer drain.
2920    #[test]
2921    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2922        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2923        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2924            .unwrap();
2925        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2926            .unwrap();
2927
2928        // Drain insert events so we start clean.
2929        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2930
2931        rt.execute_query("DROP TABLE users").unwrap();
2932
2933        // Queue must still exist with 1 collection_dropped event.
2934        let events = queue_payloads(&rt, "users_events");
2935        assert_eq!(
2936            events.len(),
2937            1,
2938            "expected 1 collection_dropped event, got {}",
2939            events.len()
2940        );
2941        let ev = events[0].as_object().expect("event is object");
2942        assert_eq!(
2943            ev.get("op").and_then(crate::json::Value::as_str),
2944            Some("collection_dropped")
2945        );
2946        assert_eq!(
2947            ev.get("collection").and_then(crate::json::Value::as_str),
2948            Some("users")
2949        );
2950        assert_eq!(
2951            ev.get("final_entities_count")
2952                .and_then(crate::json::Value::as_u64),
2953            Some(2)
2954        );
2955        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2956        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2957        assert!(ev
2958            .get("event_id")
2959            .and_then(crate::json::Value::as_str)
2960            .is_some_and(|s| !s.is_empty()));
2961
2962        // Source collection is gone.
2963        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2964        assert!(
2965            format!("{err}").contains("users"),
2966            "expected not-found error"
2967        );
2968    }
2969
2970    /// `DROP TABLE users` on a collection without event subscription works
2971    /// normally with no event emitted.
2972    #[test]
2973    fn drop_no_events_collection_emits_nothing() {
2974        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2975        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2976            .unwrap();
2977        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2978            .unwrap();
2979        rt.execute_query("DROP TABLE plain").unwrap();
2980        // No crash and collection is gone.
2981        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2982        assert!(format!("{err}").contains("plain"));
2983    }
2984
2985    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2986
2987    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2988    #[test]
2989    fn ops_filter_insert_only_ignores_update_and_delete() {
2990        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2991        rt.execute_query(
2992            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2993        )
2994        .unwrap();
2995        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2996            .unwrap();
2997        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2998            .unwrap();
2999        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3000
3001        let events = queue_payloads(&rt, "items_events");
3002        // Only the INSERT should have fired.
3003        assert_eq!(
3004            events.len(),
3005            1,
3006            "expected 1 insert event, got {}",
3007            events.len()
3008        );
3009        assert_eq!(
3010            events[0]
3011                .as_object()
3012                .unwrap()
3013                .get("op")
3014                .and_then(crate::json::Value::as_str),
3015            Some("insert")
3016        );
3017    }
3018
3019    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
3020    #[test]
3021    fn where_filter_skips_rows_that_do_not_match() {
3022        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3023        rt.execute_query(
3024            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3025        )
3026        .unwrap();
3027
3028        // This row should generate an event.
3029        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3030            .unwrap();
3031        // This row should NOT generate an event.
3032        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3033            .unwrap();
3034
3035        let events = queue_payloads(&rt, "users_events");
3036        assert_eq!(
3037            events.len(),
3038            1,
3039            "expected 1 event (only active), got {}",
3040            events.len()
3041        );
3042        let ev = events[0].as_object().unwrap();
3043        assert_eq!(
3044            ev.get("op").and_then(crate::json::Value::as_str),
3045            Some("insert")
3046        );
3047        let after = ev.get("after").unwrap().as_object().unwrap();
3048        assert_eq!(
3049            after.get("status").and_then(crate::json::Value::as_str),
3050            Some("active")
3051        );
3052    }
3053
3054    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
3055    #[test]
3056    fn ops_filter_and_where_filter_combined() {
3057        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3058        rt.execute_query(
3059            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3060        )
3061        .unwrap();
3062
3063        // INSERT active → event
3064        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3065            .unwrap();
3066        // INSERT inactive → no event
3067        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3068            .unwrap();
3069        // UPDATE row 1 to inactive → after = inactive, no event
3070        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3071            .unwrap();
3072        // DELETE → ops_filter excludes it
3073        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3074
3075        let events = queue_payloads(&rt, "items_events");
3076        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
3077        assert_eq!(
3078            events.len(),
3079            1,
3080            "expected 1 event, got {}: {events:?}",
3081            events.len()
3082        );
3083        assert_eq!(
3084            events[0]
3085                .as_object()
3086                .unwrap()
3087                .get("op")
3088                .and_then(crate::json::Value::as_str),
3089            Some("insert")
3090        );
3091    }
3092
3093    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
3094    #[test]
3095    fn where_filter_on_delete_checks_before_state() {
3096        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3097        rt.execute_query(
3098            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3099        )
3100        .unwrap();
3101
3102        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3103            .unwrap();
3104
3105        // Delete active row → event (before-state was active)
3106        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3107        // Delete inactive row → no event (before-state was inactive)
3108        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3109
3110        let events = queue_payloads(&rt, "users_events");
3111        assert_eq!(
3112            events.len(),
3113            1,
3114            "expected 1 delete event, got {}",
3115            events.len()
3116        );
3117        let ev = events[0].as_object().unwrap();
3118        assert_eq!(
3119            ev.get("op").and_then(crate::json::Value::as_str),
3120            Some("delete")
3121        );
3122    }
3123
3124    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
3125
3126    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
3127    #[test]
3128    fn alter_add_column_on_event_enabled_table_succeeds() {
3129        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3130        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3131            .unwrap();
3132        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
3133        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3134            .unwrap();
3135        // The column is now in the contract.
3136        let contract = rt.db().collection_contract("users").unwrap();
3137        assert!(
3138            contract.declared_columns.iter().any(|c| c.name == "phone"),
3139            "phone column should be in contract"
3140        );
3141        // Subscription still enabled after the alter.
3142        assert!(
3143            contract.subscriptions.iter().any(|s| s.enabled),
3144            "subscription should remain enabled"
3145        );
3146    }
3147
3148    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
3149    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
3150    #[test]
3151    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3152        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3153        rt.execute_query(
3154            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3155        )
3156        .unwrap();
3157        // DROP COLUMN — schema change event path exercises, must not error.
3158        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3159            .unwrap();
3160        let contract = rt.db().collection_contract("items").unwrap();
3161        assert!(
3162            !contract.declared_columns.iter().any(|c| c.name == "secret"),
3163            "secret column should be removed"
3164        );
3165        // ENABLE RLS — non-column op, no schema-change event (coverage).
3166        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3167            .unwrap();
3168        // Collection and subscription still intact.
3169        assert!(
3170            contract.subscriptions.iter().any(|s| s.enabled),
3171            "subscription should remain enabled"
3172        );
3173    }
3174
3175    /// Slice G (#675) — every newly-created `CollectionModel::Vector`
3176    /// collection is marked `vector.turbo` and gains a materialised
3177    /// `TurboCollectionState`. The marker is what the SEARCH/INSERT
3178    /// hot paths read to branch between TurboQuant and the legacy
3179    /// brute-force fallback.
3180    #[test]
3181    fn create_vector_marks_collection_as_turbo_baseline() {
3182        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3183        rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3184        let store = rt.db().store();
3185        assert!(
3186            crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3187            "new vector collections must be turbo-marked baseline"
3188        );
3189        assert!(
3190            rt.db().turbo_state("embeddings").is_some(),
3191            "turbo_state must materialise after CREATE VECTOR"
3192        );
3193    }
3194
3195    /// Non-vector collections must NOT be turbo-marked. Guards against
3196    /// the always-baseline change accidentally leaking the marker onto
3197    /// CREATE TABLE / CREATE DOCUMENT / etc.
3198    #[test]
3199    fn create_table_does_not_mark_turbo() {
3200        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3201        rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3202        let store = rt.db().store();
3203        assert!(
3204            !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3205            "non-vector collections must not gain the turbo marker"
3206        );
3207        assert!(rt.db().turbo_state("plain").is_none());
3208    }
3209
3210    /// `CREATE COLLECTION KIND vector.turbo` continues to mark the
3211    /// collection (idempotent with the new always-baseline path).
3212    #[test]
3213    fn create_collection_kind_vector_turbo_still_marked() {
3214        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3215        rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3216            .unwrap();
3217        let store = rt.db().store();
3218        assert!(crate::runtime::vector_turbo_kind::is_turbo(
3219            &store, "turbo_v"
3220        ));
3221        assert!(rt.db().turbo_state("turbo_v").is_some());
3222    }
3223}