Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        // Reject an AI policy that wires a provider/model to a modality it
37        // cannot serve — fail at DDL time, before the collection exists,
38        // using the capability matrix from #1269.
39        if let Some(policy) = &query.ai_policy {
40            validate_ai_policy_modalities(policy)?;
41        }
42        crate::reserved_fields::ensure_no_reserved_public_item_fields(
43            query.columns.iter().map(|column| column.name.as_str()),
44            &format!("table '{}'", query.name),
45        )?;
46        // Check if the collection already exists.
47        let exists = store.get_collection(&query.name).is_some();
48        if exists {
49            if query.if_not_exists {
50                return Ok(RuntimeQueryResult::ok_message(
51                    raw_query.to_string(),
52                    &format!("table '{}' already exists", query.name),
53                    "create",
54                ));
55            }
56            return Err(RedDBError::Query(format!(
57                "table '{}' already exists",
58                query.name
59            )));
60        }
61
62        // Build and validate the contract before mutating storage so invalid
63        // SQL types / duplicate columns do not leave partial side effects.
64        let contract = collection_contract_from_create_table(query)?;
65        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
66        // Create the collection.
67        store
68            .create_collection(&query.name)
69            .map_err(|err| RedDBError::Internal(err.to_string()))?;
70        for subscription in &contract.subscriptions {
71            ensure_event_target_queue(self, &subscription.target_queue)?;
72        }
73        if let Some(default_ttl_ms) = query.default_ttl_ms {
74            self.inner
75                .db
76                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
77        }
78        self.inner
79            .db
80            .save_collection_contract(contract)
81            .map_err(|err| RedDBError::Internal(err.to_string()))?;
82        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
83            store.set_config_tree(
84                &format!("red.collection_tenants.{}", query.name),
85                &crate::serde_json::Value::String(tenant_id),
86            );
87        }
88        self.inner
89            .db
90            .persist_metadata()
91            .map_err(|err| RedDBError::Internal(err.to_string()))?;
92        self.refresh_table_planner_stats(&query.name);
93        self.invalidate_result_cache();
94        // Issue #120 — feed the create into the schema-vocabulary
95        // reverse index so AskPipeline (#121) sees this collection.
96        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
97        self.schema_vocabulary_apply(
98            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
99                collection: query.name.clone(),
100                columns,
101                type_tags: Vec::new(),
102                description: None,
103            },
104        );
105        // Partition metadata (Phase 2.2 PG parity).
106        //
107        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
108        // clause, stamp the partition config into `red_config` under
109        // `partition.{table}.{by,column}`. Children are registered separately
110        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
111        if let Some(spec) = &query.partition_by {
112            let kind_str = match spec.kind {
113                crate::storage::query::ast::PartitionKind::Range => "range",
114                crate::storage::query::ast::PartitionKind::List => "list",
115                crate::storage::query::ast::PartitionKind::Hash => "hash",
116            };
117            store.set_config_tree(
118                &format!("partition.{}.by", query.name),
119                &crate::serde_json::Value::String(kind_str.to_string()),
120            );
121            store.set_config_tree(
122                &format!("partition.{}.column", query.name),
123                &crate::serde_json::Value::String(spec.column.clone()),
124            );
125        }
126
127        // Table-scoped multi-tenancy (Phase 2.5.4).
128        //
129        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
130        //   1. Persists the `tenant_tables.{table}.column` marker so
131        //      INSERTs can auto-fill and future opens re-hydrate.
132        //   2. Registers the table in the in-memory `tenant_tables`
133        //      HashMap used by the DML auto-fill path.
134        //   3. Installs an implicit RLS policy equivalent to
135        //      `USING (col = CURRENT_TENANT())` across all actions.
136        //   4. Flips `rls_enabled_tables` on so the policy applies.
137        if let Some(col) = &query.tenant_by {
138            store.set_config_tree(
139                &format!("tenant_tables.{}.column", query.name),
140                &crate::serde_json::Value::String(col.clone()),
141            );
142            self.register_tenant_table(&query.name, col);
143        }
144
145        let ttl_suffix = query
146            .default_ttl_ms
147            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
148            .unwrap_or_default();
149
150        let tenant_suffix = query
151            .tenant_by
152            .as_ref()
153            .map(|col| format!(" (tenant-scoped by {col})"))
154            .unwrap_or_default();
155
156        Ok(RuntimeQueryResult::ok_message(
157            raw_query.to_string(),
158            &format!(
159                "table '{}' created{}{}",
160                query.name, ttl_suffix, tenant_suffix
161            ),
162            "create",
163        ))
164    }
165
166    fn execute_create_keyed_collection(
167        &self,
168        raw_query: &str,
169        query: &CreateTableQuery,
170    ) -> RedDBResult<RuntimeQueryResult> {
171        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
172        if is_system_schema_name(&query.name) {
173            return Err(RedDBError::Query("system schema is read-only".to_string()));
174        }
175        let store = self.inner.db.store();
176        let label = polymorphic_resolver::model_name(query.collection_model);
177        if store.get_collection(&query.name).is_some() {
178            if query.if_not_exists {
179                return Ok(RuntimeQueryResult::ok_message(
180                    raw_query.to_string(),
181                    &format!("{label} '{}' already exists", query.name),
182                    "create",
183                ));
184            }
185            return Err(RedDBError::Query(format!(
186                "{label} '{}' already exists",
187                query.name
188            )));
189        }
190
191        store
192            .create_collection(&query.name)
193            .map_err(|err| RedDBError::Internal(err.to_string()))?;
194        if query.collection_model == CollectionModel::Vault {
195            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
196            let key_scope = if query.vault_own_master_key {
197                "own"
198            } else {
199                "cluster"
200            };
201            store.set_config_tree(
202                &format!("red.vault.{}.key_scope", query.name),
203                &crate::serde_json::Value::String(key_scope.to_string()),
204            );
205            store.set_config_tree(
206                &format!("red.vault.{}.status", query.name),
207                &crate::serde_json::Value::String("sealed".to_string()),
208            );
209        }
210        if query.collection_model == CollectionModel::Metrics {
211            for spec in &query.metrics_rollup_policies {
212                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
213                    .ok_or_else(|| {
214                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
215                })?;
216                if policy.source != "raw" {
217                    return Err(RedDBError::Query(format!(
218                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
219                        spec
220                    )));
221                }
222                if !matches!(
223                    policy.aggregation.as_str(),
224                    "avg" | "sum" | "min" | "max" | "count"
225                ) {
226                    return Err(RedDBError::Query(format!(
227                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
228                        spec
229                    )));
230                }
231            }
232            if let Some(raw_retention_ms) = query.default_ttl_ms {
233                self.inner
234                    .db
235                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
236                store.set_config_tree(
237                    &format!("red.metrics.{}.raw_retention_ms", query.name),
238                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
239                );
240            }
241            let tenant_identity = query
242                .tenant_by
243                .clone()
244                .unwrap_or_else(|| "current_tenant".to_string());
245            store.set_config_tree(
246                &format!("red.metrics.{}.tenant_identity", query.name),
247                &crate::serde_json::Value::String(tenant_identity),
248            );
249            store.set_config_tree(
250                &format!("red.metrics.{}.namespace", query.name),
251                &crate::serde_json::Value::String("default".to_string()),
252            );
253            if !query.metrics_rollup_policies.is_empty() {
254                store.set_config_tree(
255                    &format!("red.metrics.{}.rollup_policies", query.name),
256                    &crate::serde_json::Value::Array(
257                        query
258                            .metrics_rollup_policies
259                            .iter()
260                            .cloned()
261                            .map(crate::serde_json::Value::String)
262                            .collect(),
263                    ),
264                );
265            }
266        }
267        let contract = if query.collection_model == CollectionModel::Metrics {
268            metrics_collection_contract(query)
269        } else {
270            keyed_collection_contract(
271                &query.name,
272                query.collection_model,
273                query.analytics_config.clone(),
274            )
275        };
276        self.inner
277            .db
278            .save_collection_contract(contract)
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
281            store.set_config_tree(
282                &format!("red.collection_tenants.{}", query.name),
283                &crate::serde_json::Value::String(tenant_id),
284            );
285        }
286        self.inner
287            .db
288            .persist_metadata()
289            .map_err(|err| RedDBError::Internal(err.to_string()))?;
290        self.invalidate_result_cache();
291
292        Ok(RuntimeQueryResult::ok_message(
293            raw_query.to_string(),
294            &format!("{label} '{}' created", query.name),
295            "create",
296        ))
297    }
298
299    /// Bootstrap an internal graph collection declared `WITH ANALYTICS`
300    /// (issue #803). The SQL DDL path refuses `red.*` names via
301    /// [`is_system_schema_name`], so built-ins like `red.topology.cluster`
302    /// cannot be created through `CREATE GRAPH`; this method drives the same
303    /// contract-build / save / persist path directly, bypassing only that
304    /// guard. Idempotent: a no-op when the collection already exists (its
305    /// analytics contract is recovered from the WAL on restart), so it is safe
306    /// to call unconditionally on every boot.
307    pub fn ensure_system_graph_with_analytics(
308        &self,
309        name: &str,
310        outputs: &[crate::catalog::AnalyticsOutput],
311    ) -> RedDBResult<()> {
312        let store = self.inner.db.store();
313        if store.get_collection(name).is_some() {
314            return Ok(());
315        }
316        let analytics_config = outputs
317            .iter()
318            .map(|output| crate::catalog::AnalyticsViewDescriptor {
319                output: *output,
320                algorithm: None,
321                resolution: None,
322                max_iterations: None,
323                tolerance: None,
324            })
325            .collect();
326        store
327            .create_collection(name)
328            .map_err(|err| RedDBError::Internal(err.to_string()))?;
329        let contract = keyed_collection_contract(
330            name,
331            crate::catalog::CollectionModel::Graph,
332            analytics_config,
333        );
334        self.inner
335            .db
336            .save_collection_contract(contract)
337            .map_err(|err| RedDBError::Internal(err.to_string()))?;
338        self.inner
339            .db
340            .persist_metadata()
341            .map_err(|err| RedDBError::Internal(err.to_string()))?;
342        self.invalidate_result_cache_process_only();
343        Ok(())
344    }
345
346    pub fn execute_create_collection(
347        &self,
348        raw_query: &str,
349        query: &CreateCollectionQuery,
350    ) -> RedDBResult<RuntimeQueryResult> {
351        let model = match query.kind.as_str() {
352            "graph" => CollectionModel::Graph,
353            "document" => CollectionModel::Document,
354            "metrics" => CollectionModel::Metrics,
355            "vector.turbo" => {
356                let dimension = query.vector_dimension.ok_or_else(|| {
357                    RedDBError::Query(
358                        "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
359                    )
360                })?;
361                let create = CreateVectorQuery {
362                    name: query.name.clone(),
363                    dimension,
364                    metric: query
365                        .vector_metric
366                        .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
367                    if_not_exists: query.if_not_exists,
368                };
369                let result = self.execute_create_vector(raw_query, &create)?;
370                // Issue #693 — durable kind marker that distinguishes
371                // this collection from the legacy `vector` path on
372                // every restart. Runtime state (TurboQuantIndex +
373                // TurboExtent) is lazily materialised by
374                // `RedDB::turbo_state` on first INSERT/SEARCH so the
375                // marker is the only thing that needs to survive.
376                let store = self.inner.db.store();
377                crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
378                self.inner
379                    .db
380                    .persist_metadata()
381                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
382                // Eagerly construct the state so the TurboExtent (if
383                // any) is reserved at creation time rather than on
384                // first INSERT. Errors are swallowed: the lazy path in
385                // `turbo_state` will retry on the next access.
386                let _ = self.inner.db.turbo_state(&query.name);
387                return Ok(result);
388            }
389            // KIND blockchain — issue #523 foundation: stored on top of a
390            // Table-shaped collection. The `chain` marker + reserved-column
391            // discipline make the difference. Schema validation, conflict
392            // retries, and `verify_chain` come in later iterations.
393            "blockchain" => CollectionModel::Table,
394            other => {
395                return Err(RedDBError::Query(format!(
396                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
397                )));
398            }
399        };
400        let create = CreateTableQuery {
401            collection_model: model,
402            name: query.name.clone(),
403            columns: Vec::new(),
404            if_not_exists: query.if_not_exists,
405            default_ttl_ms: None,
406            metrics_rollup_policies: Vec::new(),
407            context_index_fields: Vec::new(),
408            context_index_enabled: false,
409            timestamps: false,
410            partition_by: None,
411            tenant_by: None,
412            append_only: false,
413            subscriptions: Vec::new(),
414            analytics_config: Vec::new(),
415            vault_own_master_key: false,
416            ai_policy: None,
417        };
418        let result = self.execute_create_table(raw_query, &create)?;
419        if query.kind == "blockchain" {
420            self.install_blockchain_kind(&query.name)?;
421        }
422        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
423        // already produces a validated 32-byte pubkey list; installing
424        // the registry stamps the per-collection signer set into
425        // `red_config` so the INSERT path can load it cheaply.
426        if !query.allowed_signers.is_empty() {
427            let actor = crate::runtime::impl_core::current_user_projected()
428                .unwrap_or_else(|| "@system/create-collection".to_string());
429            crate::runtime::signed_writes_kind::install(
430                &self.inner.db.store(),
431                &query.name,
432                &query.allowed_signers,
433                &actor,
434            );
435        }
436        Ok(result)
437    }
438
439    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
440    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
441    /// has a row at height 0 we leave it.
442    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
443        use crate::runtime::blockchain_kind;
444        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
445        use std::sync::Arc;
446
447        let store = self.inner.db.store();
448        blockchain_kind::mark_as_chain(&store, name);
449
450        let existing_tip = blockchain_kind::chain_tip(&store, name);
451        if existing_tip.height.is_some() {
452            return Ok(());
453        }
454
455        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
456        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
457            fields.into_iter().collect();
458        let entity = UnifiedEntity::new(
459            EntityId::new(0),
460            EntityKind::TableRow {
461                table: Arc::from(name),
462                row_id: 0,
463            },
464            EntityData::Row(RowData {
465                columns: Vec::new(),
466                named: Some(named),
467                schema: None,
468            }),
469        );
470        store
471            .insert_auto(name, entity)
472            .map_err(|err| RedDBError::Internal(err.to_string()))?;
473        // #524: prime the in-memory tip cache so the chain-tip endpoint and
474        // subsequent INSERTs don't have to scan the collection to find genesis.
475        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
476            self.inner
477                .chain_tip_cache
478                .lock()
479                .insert(name.to_string(), tip);
480        }
481        Ok(())
482    }
483
484    pub fn execute_create_vector(
485        &self,
486        raw_query: &str,
487        query: &CreateVectorQuery,
488    ) -> RedDBResult<RuntimeQueryResult> {
489        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
490        if is_system_schema_name(&query.name) {
491            return Err(RedDBError::Query("system schema is read-only".to_string()));
492        }
493        let store = self.inner.db.store();
494        if store.get_collection(&query.name).is_some() {
495            if query.if_not_exists {
496                return Ok(RuntimeQueryResult::ok_message(
497                    raw_query.to_string(),
498                    &format!("vector '{}' already exists", query.name),
499                    "create",
500                ));
501            }
502            return Err(RedDBError::Query(format!(
503                "vector '{}' already exists",
504                query.name
505            )));
506        }
507
508        store
509            .create_collection(&query.name)
510            .map_err(|err| RedDBError::Internal(err.to_string()))?;
511        self.inner
512            .db
513            .save_collection_contract(vector_collection_contract(query))
514            .map_err(|err| RedDBError::Internal(err.to_string()))?;
515        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
516            store.set_config_tree(
517                &format!("red.collection_tenants.{}", query.name),
518                &crate::serde_json::Value::String(tenant_id),
519            );
520        }
521        // Slice G (#675) — register `vector.turbo` as the built-in
522        // baseline for every newly-created vector collection. The
523        // marker is the durable signal `RedDB::turbo_state` reads to
524        // materialise the TurboQuantIndex + TurboExtent on first
525        // INSERT/SEARCH. Pre-existing vector collections were created
526        // without the marker and stay on the brute-force fallback path
527        // until they are explicitly migrated — there is no implicit
528        // backfill in this slice.
529        crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
530        self.inner
531            .db
532            .persist_metadata()
533            .map_err(|err| RedDBError::Internal(err.to_string()))?;
534        // Eagerly materialise the per-collection turbo state so the
535        // TurboExtent (when the store is paged) is reserved at CREATE
536        // time rather than on the first INSERT. Failures are swallowed
537        // — the lazy path in `turbo_state` retries on next access.
538        let _ = self.inner.db.turbo_state(&query.name);
539        self.invalidate_result_cache();
540
541        Ok(RuntimeQueryResult::ok_message(
542            raw_query.to_string(),
543            &format!("vector '{}' created", query.name),
544            "create",
545        ))
546    }
547
548    fn provision_vault_key_material(
549        &self,
550        collection: &str,
551        own_master_key: bool,
552    ) -> RedDBResult<()> {
553        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
554            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
555        })?;
556        if !auth_store.is_vault_backed() {
557            return Err(RedDBError::Query(
558                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
559            ));
560        }
561
562        if auth_store.vault_secret_key().is_none() {
563            let key = crate::auth::store::random_bytes(32);
564            auth_store
565                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
566                .map_err(|err| RedDBError::Query(err.to_string()))?;
567        }
568
569        if own_master_key {
570            let key = crate::auth::store::random_bytes(32);
571            auth_store
572                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
573                .map_err(|err| RedDBError::Query(err.to_string()))?;
574        }
575
576        Ok(())
577    }
578
579    /// Execute DROP TABLE
580    ///
581    /// Drops the collection and all its data from the store.
582    pub fn execute_drop_table(
583        &self,
584        raw_query: &str,
585        query: &DropTableQuery,
586    ) -> RedDBResult<RuntimeQueryResult> {
587        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
588        let store = self.inner.db.store();
589
590        if is_system_schema_name(&query.name) {
591            return Err(RedDBError::Query("system schema is read-only".to_string()));
592        }
593
594        let exists = store.get_collection(&query.name).is_some();
595        if !exists {
596            if query.if_exists {
597                return Ok(RuntimeQueryResult::ok_message(
598                    raw_query.to_string(),
599                    &format!("table '{}' does not exist", query.name),
600                    "drop",
601                ));
602            }
603            return Err(RedDBError::NotFound(format!(
604                "table '{}' not found",
605                query.name
606            )));
607        }
608        let actual =
609            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
610        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
611
612        // Emit 1 collection_dropped event before storage is wiped.
613        // Queue is preserved; subscription is removed with the contract below.
614        let final_count = store
615            .get_collection(&query.name)
616            .map(|manager| manager.query_all(|_| true).len() as u64)
617            .unwrap_or(0);
618        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
619            self,
620            &query.name,
621            final_count,
622        )?;
623
624        let orphaned_indices: Vec<String> = self
625            .inner
626            .index_store
627            .list_indices(&query.name)
628            .into_iter()
629            .map(|index| index.name)
630            .collect();
631        for name in &orphaned_indices {
632            self.inner.index_store.drop_index(name, &query.name);
633        }
634
635        store
636            .drop_collection(&query.name)
637            .map_err(|err| RedDBError::Internal(err.to_string()))?;
638        self.inner.db.invalidate_vector_index(&query.name);
639        self.inner.db.clear_collection_default_ttl_ms(&query.name);
640        self.inner
641            .db
642            .remove_collection_contract(&query.name)
643            .map_err(|err| RedDBError::Internal(err.to_string()))?;
644        self.clear_table_planner_stats(&query.name);
645        self.invalidate_result_cache();
646        // Issue #119: a dropped collection vanishes from every
647        // (tenant, role)'s visible-collections set. Auth is optional in
648        // embedded mode so guard the call.
649        if let Some(store) = self.inner.auth_store.read().clone() {
650            store.invalidate_visible_collections_cache();
651        }
652        self.inner
653            .db
654            .persist_metadata()
655            .map_err(|err| RedDBError::Internal(err.to_string()))?;
656        // Issue #120 — drop both the collection entries *and* every
657        // index entry that was scoped to this collection.  Dropping the
658        // collection wipes columns + collection-name + type-tags +
659        // index hits in one pass via `purge_collection_entries`, so
660        // the explicit `DropIndex` calls would be redundant.
661        self.schema_vocabulary_apply(
662            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
663                collection: query.name.clone(),
664            },
665        );
666
667        Ok(RuntimeQueryResult::ok_message(
668            raw_query.to_string(),
669            &format!("table '{}' dropped", query.name),
670            "drop",
671        ))
672    }
673
674    pub fn execute_drop_graph(
675        &self,
676        raw_query: &str,
677        query: &DropGraphQuery,
678    ) -> RedDBResult<RuntimeQueryResult> {
679        self.execute_drop_typed_collection(
680            raw_query,
681            &query.name,
682            query.if_exists,
683            CollectionModel::Graph,
684            "graph",
685        )
686    }
687
688    pub fn execute_drop_vector(
689        &self,
690        raw_query: &str,
691        query: &DropVectorQuery,
692    ) -> RedDBResult<RuntimeQueryResult> {
693        self.execute_drop_typed_collection(
694            raw_query,
695            &query.name,
696            query.if_exists,
697            CollectionModel::Vector,
698            "vector",
699        )
700    }
701
702    pub fn execute_drop_document(
703        &self,
704        raw_query: &str,
705        query: &DropDocumentQuery,
706    ) -> RedDBResult<RuntimeQueryResult> {
707        self.execute_drop_typed_collection(
708            raw_query,
709            &query.name,
710            query.if_exists,
711            CollectionModel::Document,
712            "document",
713        )
714    }
715
716    pub fn execute_drop_kv(
717        &self,
718        raw_query: &str,
719        query: &DropKvQuery,
720    ) -> RedDBResult<RuntimeQueryResult> {
721        let label = polymorphic_resolver::model_name(query.model);
722        self.execute_drop_typed_collection(
723            raw_query,
724            &query.name,
725            query.if_exists,
726            query.model,
727            label,
728        )
729    }
730
731    pub fn execute_drop_collection(
732        &self,
733        raw_query: &str,
734        query: &DropCollectionQuery,
735    ) -> RedDBResult<RuntimeQueryResult> {
736        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
737        if is_system_schema_name(&query.name) {
738            return Err(RedDBError::Query("system schema is read-only".to_string()));
739        }
740        let store = self.inner.db.store();
741        if store.get_collection(&query.name).is_none() {
742            if query.if_exists {
743                return Ok(RuntimeQueryResult::ok_message(
744                    raw_query.to_string(),
745                    &format!("collection '{}' does not exist", query.name),
746                    "drop",
747                ));
748            }
749            return Err(RedDBError::NotFound(format!(
750                "collection '{}' not found",
751                query.name
752            )));
753        }
754
755        let actual =
756            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
757        if let Some(expected) = query.model {
758            polymorphic_resolver::ensure_model_match(expected, actual)?;
759        }
760
761        match actual {
762            CollectionModel::Table => self.execute_drop_table(
763                raw_query,
764                &DropTableQuery {
765                    name: query.name.clone(),
766                    if_exists: query.if_exists,
767                },
768            ),
769            CollectionModel::TimeSeries => self.execute_drop_timeseries(
770                raw_query,
771                &DropTimeSeriesQuery {
772                    name: query.name.clone(),
773                    if_exists: query.if_exists,
774                },
775            ),
776            CollectionModel::Queue => self.execute_drop_queue(
777                raw_query,
778                &DropQueueQuery {
779                    name: query.name.clone(),
780                    if_exists: query.if_exists,
781                },
782            ),
783            CollectionModel::Graph => self.execute_drop_graph(
784                raw_query,
785                &DropGraphQuery {
786                    name: query.name.clone(),
787                    if_exists: query.if_exists,
788                },
789            ),
790            CollectionModel::Vector => self.execute_drop_vector(
791                raw_query,
792                &DropVectorQuery {
793                    name: query.name.clone(),
794                    if_exists: query.if_exists,
795                },
796            ),
797            CollectionModel::Document => self.execute_drop_document(
798                raw_query,
799                &DropDocumentQuery {
800                    name: query.name.clone(),
801                    if_exists: query.if_exists,
802                },
803            ),
804            CollectionModel::Kv => self.execute_drop_kv(
805                raw_query,
806                &DropKvQuery {
807                    name: query.name.clone(),
808                    if_exists: query.if_exists,
809                    model: CollectionModel::Kv,
810                },
811            ),
812            CollectionModel::Config => self.execute_drop_kv(
813                raw_query,
814                &DropKvQuery {
815                    name: query.name.clone(),
816                    if_exists: query.if_exists,
817                    model: CollectionModel::Config,
818                },
819            ),
820            CollectionModel::Vault => self.execute_drop_kv(
821                raw_query,
822                &DropKvQuery {
823                    name: query.name.clone(),
824                    if_exists: query.if_exists,
825                    model: CollectionModel::Vault,
826                },
827            ),
828            CollectionModel::Hll => self.execute_probabilistic_command(
829                raw_query,
830                &ProbabilisticCommand::DropHll {
831                    name: query.name.clone(),
832                    if_exists: query.if_exists,
833                },
834            ),
835            CollectionModel::Sketch => self.execute_probabilistic_command(
836                raw_query,
837                &ProbabilisticCommand::DropSketch {
838                    name: query.name.clone(),
839                    if_exists: query.if_exists,
840                },
841            ),
842            CollectionModel::Filter => self.execute_probabilistic_command(
843                raw_query,
844                &ProbabilisticCommand::DropFilter {
845                    name: query.name.clone(),
846                    if_exists: query.if_exists,
847                },
848            ),
849            CollectionModel::Metrics => self.execute_drop_typed_collection(
850                raw_query,
851                &query.name,
852                query.if_exists,
853                CollectionModel::Metrics,
854                "metrics",
855            ),
856            CollectionModel::Mixed => self.execute_drop_typed_collection(
857                raw_query,
858                &query.name,
859                query.if_exists,
860                CollectionModel::Mixed,
861                "collection",
862            ),
863        }
864    }
865
866    /// Issue #801 — guard `ALTER GRAPH ... ADD|DROP ANALYTICS` so analytics
867    /// lifecycle mutations only land on a collection declared as a graph. The
868    /// `<graph>.<output>` resolver only consults `analytics_config` on
869    /// graph-model contracts, so allowing the op on any other model would write
870    /// dead config the reader can never surface.
871    fn require_graph_for_analytics(&self, name: &str) -> RedDBResult<()> {
872        let is_graph = self
873            .inner
874            .db
875            .collection_contract(name)
876            .map(|c| c.declared_model == crate::catalog::CollectionModel::Graph)
877            .unwrap_or(false);
878        if !is_graph {
879            return Err(RedDBError::Query(format!(
880                "ALTER GRAPH ... ANALYTICS: '{name}' is not a graph collection"
881            )));
882        }
883        Ok(())
884    }
885
886    /// Execute ALTER TABLE
887    ///
888    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
889    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
890    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
891    pub fn execute_alter_table(
892        &self,
893        raw_query: &str,
894        query: &AlterTableQuery,
895    ) -> RedDBResult<RuntimeQueryResult> {
896        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
897        let store = self.inner.db.store();
898
899        // Verify the table exists.
900        if store.get_collection(&query.name).is_none() {
901            return Err(RedDBError::NotFound(format!(
902                "table '{}' not found",
903                query.name
904            )));
905        }
906
907        let mut messages = Vec::new();
908
909        // Collect column-level changes upfront for schema-change event emission below.
910        let fields_added: Vec<String> = query
911            .operations
912            .iter()
913            .filter_map(|op| {
914                if let AlterOperation::AddColumn(col) = op {
915                    Some(col.name.clone())
916                } else {
917                    None
918                }
919            })
920            .collect();
921        let fields_removed: Vec<String> = query
922            .operations
923            .iter()
924            .filter_map(|op| {
925                if let AlterOperation::DropColumn(name) = op {
926                    Some(name.clone())
927                } else {
928                    None
929                }
930            })
931            .collect();
932
933        for op in &query.operations {
934            match op {
935                AlterOperation::AddColumn(col) => {
936                    // Schema-on-read: column will be available on next insert.
937                    messages.push(format!("column '{}' added", col.name));
938                }
939                AlterOperation::DropColumn(name) => {
940                    messages.push(format!("column '{}' dropped", name));
941                }
942                AlterOperation::RenameColumn { from, to } => {
943                    messages.push(format!("column '{}' renamed to '{}'", from, to));
944                }
945                AlterOperation::AttachPartition { child, bound } => {
946                    // Persist child → parent binding in red_config so the
947                    // future planner-side pruner can enumerate children and
948                    // evaluate their bounds.
949                    store.set_config_tree(
950                        &format!("partition.{}.children.{}", query.name, child),
951                        &crate::serde_json::Value::String(bound.clone()),
952                    );
953                    messages.push(format!(
954                        "partition '{child}' attached to '{}' ({bound})",
955                        query.name
956                    ));
957                }
958                AlterOperation::DetachPartition { child } => {
959                    store.set_config_tree(
960                        &format!("partition.{}.children.{}", query.name, child),
961                        &crate::serde_json::Value::Null,
962                    );
963                    messages.push(format!(
964                        "partition '{child}' detached from '{}'",
965                        query.name
966                    ));
967                }
968                AlterOperation::EnableRowLevelSecurity => {
969                    self.inner
970                        .rls_enabled_tables
971                        .write()
972                        .insert(query.name.clone());
973                    // Persist flag so RLS survives restart via red_config.
974                    store.set_config_tree(
975                        &format!("rls.enabled.{}", query.name),
976                        &crate::serde_json::Value::Bool(true),
977                    );
978                    self.invalidate_plan_cache();
979                    messages.push(format!("row level security enabled on '{}'", query.name));
980                }
981                AlterOperation::DisableRowLevelSecurity => {
982                    self.inner.rls_enabled_tables.write().remove(&query.name);
983                    store.set_config_tree(
984                        &format!("rls.enabled.{}", query.name),
985                        &crate::serde_json::Value::Null,
986                    );
987                    self.invalidate_plan_cache();
988                    messages.push(format!("row level security disabled on '{}'", query.name));
989                }
990                // Phase 2.5.4: retrofit tenancy onto an existing table.
991                AlterOperation::EnableTenancy { column } => {
992                    store.set_config_tree(
993                        &format!("tenant_tables.{}.column", query.name),
994                        &crate::serde_json::Value::String(column.clone()),
995                    );
996                    self.register_tenant_table(&query.name, column);
997                    self.invalidate_plan_cache();
998                    messages.push(format!(
999                        "tenancy enabled on '{}' by column '{column}'",
1000                        query.name
1001                    ));
1002                }
1003                AlterOperation::DisableTenancy => {
1004                    store.set_config_tree(
1005                        &format!("tenant_tables.{}.column", query.name),
1006                        &crate::serde_json::Value::Null,
1007                    );
1008                    self.unregister_tenant_table(&query.name);
1009                    self.invalidate_plan_cache();
1010                    messages.push(format!("tenancy disabled on '{}'", query.name));
1011                }
1012                AlterOperation::SetAppendOnly(on) => {
1013                    // Contract is the single source of truth for the
1014                    // UPDATE/DELETE parse-time guard. The flag lands
1015                    // below via `apply_alter_operations_to_contract`;
1016                    // here we only publish the human-readable message.
1017                    messages.push(format!(
1018                        "append_only {} on '{}'",
1019                        if *on { "enabled" } else { "disabled" },
1020                        query.name
1021                    ));
1022                }
1023                AlterOperation::SetVersioned(on) => {
1024                    // Opt the collection into (or out of) Git-for-Data.
1025                    // Persists a row in red_vcs_settings; next AS OF /
1026                    // merge / diff against this table honours the
1027                    // flag. Retroactive: existing row versions whose
1028                    // xmins are still pinned by commits become
1029                    // reachable via AS OF COMMIT immediately.
1030                    self.vcs_set_versioned(&query.name, *on)?;
1031                    messages.push(format!(
1032                        "versioned {} on '{}'",
1033                        if *on { "enabled" } else { "disabled" },
1034                        query.name
1035                    ));
1036                }
1037                AlterOperation::EnableEvents(subscription) => {
1038                    let mut subscription = subscription.clone();
1039                    subscription.source = query.name.clone();
1040                    validate_event_subscriptions(
1041                        self,
1042                        &query.name,
1043                        std::slice::from_ref(&subscription),
1044                    )?;
1045                    ensure_event_target_queue(self, &subscription.target_queue)?;
1046                    messages.push(format!(
1047                        "events enabled on '{}' to '{}'",
1048                        query.name, subscription.target_queue
1049                    ));
1050                }
1051                AlterOperation::DisableEvents => {
1052                    messages.push(format!("events disabled on '{}'", query.name));
1053                }
1054                AlterOperation::AddSubscription { name, descriptor } => {
1055                    let mut sub = descriptor.clone();
1056                    sub.name = name.clone();
1057                    sub.source = query.name.clone();
1058                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
1059                    ensure_event_target_queue(self, &sub.target_queue)?;
1060                    messages.push(format!(
1061                        "subscription '{}' added on '{}' to '{}'",
1062                        name, query.name, sub.target_queue
1063                    ));
1064                }
1065                AlterOperation::DropSubscription { name } => {
1066                    messages.push(format!(
1067                        "subscription '{}' dropped on '{}'",
1068                        name, query.name
1069                    ));
1070                }
1071                AlterOperation::AddSigner { pubkey } => {
1072                    // Issue #522 — admin-gated registry mutation. The
1073                    // standard DDL `check_write` above gates by role; we
1074                    // additionally verify the collection actually has a
1075                    // signed-writes registry installed so this isn't a
1076                    // covert way to retrofit one (use `CREATE COLLECTION
1077                    // ... SIGNED_BY (...)` for that).
1078                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1079                        return Err(RedDBError::Query(format!(
1080                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
1081                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
1082                            query.name
1083                        )));
1084                    }
1085                    let actor = crate::runtime::impl_core::current_user_projected()
1086                        .unwrap_or_else(|| "@system/alter".to_string());
1087                    let changed = crate::runtime::signed_writes_kind::add_signer(
1088                        &store,
1089                        &query.name,
1090                        *pubkey,
1091                        &actor,
1092                    );
1093                    messages.push(format!(
1094                        "signer {} on '{}'",
1095                        if changed { "added" } else { "already present" },
1096                        query.name
1097                    ));
1098                }
1099                AlterOperation::RevokeSigner { pubkey } => {
1100                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1101                        return Err(RedDBError::Query(format!(
1102                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1103                            query.name
1104                        )));
1105                    }
1106                    let actor = crate::runtime::impl_core::current_user_projected()
1107                        .unwrap_or_else(|| "@system/alter".to_string());
1108                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
1109                        &store,
1110                        &query.name,
1111                        pubkey,
1112                        &actor,
1113                    );
1114                    messages.push(format!(
1115                        "signer {} on '{}'",
1116                        if changed {
1117                            "revoked"
1118                        } else {
1119                            "already revoked"
1120                        },
1121                        query.name
1122                    ));
1123                }
1124                AlterOperation::SetRetention { duration_ms } => {
1125                    // Issue #580 — validate that the collection has a
1126                    // timestamp column the lazy-on-scan filter can key
1127                    // off. Without one there is no anchor for "older
1128                    // than now - duration"; reject at ALTER time so the
1129                    // policy can never silently hide all rows.
1130                    let existing = self.inner.db.collection_contract(&query.name);
1131                    let has_ts_column = existing
1132                        .as_ref()
1133                        .map(retention_timestamp_column_exists)
1134                        .unwrap_or(false);
1135                    if !has_ts_column {
1136                        return Err(RedDBError::Query(format!(
1137                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1138                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1139                             or enable WITH timestamps = true before setting a \
1140                             retention policy",
1141                            query.name
1142                        )));
1143                    }
1144                    messages.push(format!(
1145                        "retention set to {duration_ms} ms on '{}'",
1146                        query.name
1147                    ));
1148                }
1149                AlterOperation::UnsetRetention => {
1150                    messages.push(format!("retention cleared on '{}'", query.name));
1151                }
1152                AlterOperation::AddAnalytics(views) => {
1153                    // Issue #801 — analytics lifecycle is graph-only. Reject the
1154                    // op on a non-graph collection so a misrouted ALTER fails
1155                    // loudly instead of silently writing analytics_config onto a
1156                    // model whose reader never resolves `<name>.<output>`.
1157                    self.require_graph_for_analytics(&query.name)?;
1158                    let existing = self.inner.db.collection_contract(&query.name);
1159                    let enabled: std::collections::BTreeSet<crate::catalog::AnalyticsOutput> =
1160                        existing
1161                            .as_ref()
1162                            .map(|c| c.analytics_config.iter().map(|v| v.output).collect())
1163                            .unwrap_or_default();
1164                    for view in views {
1165                        if enabled.contains(&view.output) {
1166                            // Idempotent: adding an already-enabled output is a
1167                            // no-op — no error, no duplicate state.
1168                            messages.push(format!(
1169                                "analytics '{}' already enabled on '{}'",
1170                                view.output.as_str(),
1171                                query.name
1172                            ));
1173                        } else {
1174                            messages.push(format!(
1175                                "analytics '{}' enabled on '{}'",
1176                                view.output.as_str(),
1177                                query.name
1178                            ));
1179                        }
1180                    }
1181                }
1182                AlterOperation::DropAnalytics(output) => {
1183                    self.require_graph_for_analytics(&query.name)?;
1184                    let enabled = self
1185                        .inner
1186                        .db
1187                        .collection_contract(&query.name)
1188                        .map(|c| c.analytics_config.iter().any(|v| v.output == *output))
1189                        .unwrap_or(false);
1190                    if !enabled {
1191                        // Dropping an output that was never enabled is a clean,
1192                        // explicit error (AC4) rather than a silent no-op.
1193                        return Err(RedDBError::Query(format!(
1194                            "ALTER GRAPH DROP ANALYTICS: analytics output '{}' is not enabled on graph '{}'",
1195                            output.as_str(),
1196                            query.name
1197                        )));
1198                    }
1199                    messages.push(format!(
1200                        "analytics '{}' disabled on '{}'",
1201                        output.as_str(),
1202                        query.name
1203                    ));
1204                }
1205            }
1206        }
1207
1208        let mut contract = self
1209            .inner
1210            .db
1211            .collection_contract(&query.name)
1212            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1213        apply_alter_operations_to_contract(&mut contract, &query.operations);
1214        contract.version = contract.version.saturating_add(1);
1215        contract.updated_at_unix_ms = current_unix_ms();
1216        self.inner
1217            .db
1218            .save_collection_contract(contract)
1219            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1220        // Issue #301 — emit OperatorEvent when column schema changes on a
1221        // collection that has active event subscriptions, so operators know
1222        // downstream consumers may see a different payload shape.
1223        if !fields_added.is_empty() || !fields_removed.is_empty() {
1224            let sub_names: Vec<String> = self
1225                .inner
1226                .db
1227                .collection_contract(&query.name)
1228                .map(|c| {
1229                    c.subscriptions
1230                        .iter()
1231                        .filter(|s| s.enabled)
1232                        .map(|s| s.name.clone())
1233                        .collect()
1234                })
1235                .unwrap_or_default();
1236            if !sub_names.is_empty() {
1237                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1238                    collection: query.name.clone(),
1239                    subscription_names: sub_names.join(", "),
1240                    fields_added: fields_added.join(", "),
1241                    fields_removed: fields_removed.join(", "),
1242                    lsn: self.cdc_current_lsn(),
1243                }
1244                .emit_global();
1245            }
1246        }
1247
1248        self.clear_table_planner_stats(&query.name);
1249        self.invalidate_result_cache();
1250        // Issue #120 — refresh the schema-vocabulary entries from the
1251        // post-ALTER contract. Drop+recreate inside the index keeps
1252        // the invalidation guarantee complete (no stale columns from
1253        // before an ALTER ... DROP COLUMN).
1254        let post_alter_columns: Vec<String> = self
1255            .inner
1256            .db
1257            .collection_contract(&query.name)
1258            .map(|contract| {
1259                contract
1260                    .declared_columns
1261                    .iter()
1262                    .map(|col| col.name.clone())
1263                    .collect()
1264            })
1265            .unwrap_or_default();
1266        self.schema_vocabulary_apply(
1267            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1268                collection: query.name.clone(),
1269                columns: post_alter_columns,
1270                type_tags: Vec::new(),
1271                description: None,
1272            },
1273        );
1274
1275        let message = if messages.is_empty() {
1276            format!("table '{}' altered (no operations)", query.name)
1277        } else {
1278            format!("table '{}' altered: {}", query.name, messages.join(", "))
1279        };
1280
1281        Ok(RuntimeQueryResult::ok_message(
1282            raw_query.to_string(),
1283            &message,
1284            "alter",
1285        ))
1286    }
1287
1288    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1289    ///
1290    /// Pure read: computes the schema diff between the target table's
1291    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1292    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1293    /// JSON. Never mutates storage.
1294    pub fn execute_explain_alter(
1295        &self,
1296        raw_query: &str,
1297        query: &ExplainAlterQuery,
1298    ) -> RedDBResult<RuntimeQueryResult> {
1299        // Validate the target CREATE TABLE body so syntactically valid
1300        // but semantically broken targets (bad SQL types, duplicate
1301        // columns) are caught here rather than inside the diff engine.
1302        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1303
1304        let current_contract = self.inner.db.collection_contract(&query.target.name);
1305
1306        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1307            .as_ref()
1308            .map(|c| c.declared_columns.clone())
1309            .unwrap_or_default();
1310
1311        let diff = super::schema_diff::compute_column_diff(
1312            &query.target.name,
1313            &current_columns,
1314            &query.target.columns,
1315        );
1316
1317        let rendered = match query.format {
1318            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1319            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1320        };
1321
1322        let format_label = match query.format {
1323            ExplainFormat::Sql => "sql",
1324            ExplainFormat::Json => "json",
1325        };
1326
1327        let columns = vec![
1328            "table".to_string(),
1329            "format".to_string(),
1330            "diff".to_string(),
1331        ];
1332        let row = vec![
1333            ("table".to_string(), Value::text(query.target.name.clone())),
1334            ("format".to_string(), Value::text(format_label.to_string())),
1335            ("diff".to_string(), Value::text(rendered)),
1336        ];
1337
1338        Ok(RuntimeQueryResult::ok_records(
1339            raw_query.to_string(),
1340            columns,
1341            vec![row],
1342            "explain",
1343        ))
1344    }
1345
1346    /// Execute CREATE INDEX
1347    ///
1348    /// Registers a new index on a collection, builds it from existing data,
1349    /// and makes it available to the query executor for O(1) lookups.
1350    pub fn execute_create_index(
1351        &self,
1352        raw_query: &str,
1353        query: &CreateIndexQuery,
1354    ) -> RedDBResult<RuntimeQueryResult> {
1355        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1356        let store = self.inner.db.store();
1357
1358        // Verify the table exists
1359        let manager = store
1360            .get_collection(&query.table)
1361            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1362
1363        let method_kind = match query.method {
1364            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1365            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1366            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1367            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1368        };
1369
1370        // Extract fields from existing entities for indexing. Row
1371        // entities may arrive in either the "named" HashMap layout
1372        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1373        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1374        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1375        // leaves `named == None`). Prior to this commit the columnar
1376        // branch returned an empty field list, so `CREATE INDEX` built
1377        // a zero-entity index over HTTP-inserted data even though the
1378        // data was queryable via `SELECT`.
1379        let entities = manager.query_all(|_| true);
1380        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1381            entities
1382                .iter()
1383                .map(|e| {
1384                    let fields = match &e.data {
1385                        crate::storage::EntityData::Row(row) => {
1386                            if let Some(ref named) = row.named {
1387                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1388                            } else if let Some(ref schema) = row.schema {
1389                                // Columnar layout — pair each column
1390                                // with its positional name from the
1391                                // shared schema Arc.
1392                                schema
1393                                    .iter()
1394                                    .zip(row.columns.iter())
1395                                    .map(|(k, v)| (k.clone(), v.clone()))
1396                                    .collect()
1397                            } else {
1398                                Vec::new()
1399                            }
1400                        }
1401                        crate::storage::EntityData::Node(node) => node
1402                            .properties
1403                            .iter()
1404                            .map(|(k, v)| (k.clone(), v.clone()))
1405                            .collect(),
1406                        _ => Vec::new(),
1407                    };
1408                    (e.id, fields)
1409                })
1410                .collect();
1411
1412        // Build the index
1413        let indexed_count = self
1414            .inner
1415            .index_store
1416            .create_index(
1417                &query.name,
1418                &query.table,
1419                &query.columns,
1420                method_kind,
1421                query.unique,
1422                &entity_fields,
1423            )
1424            .map_err(RedDBError::Internal)?;
1425
1426        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1427            &query.table,
1428            &entity_fields,
1429        );
1430        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1431        self.invalidate_plan_cache();
1432
1433        // Register metadata
1434        self.inner
1435            .index_store
1436            .register(super::index_store::RegisteredIndex {
1437                name: query.name.clone(),
1438                collection: query.table.clone(),
1439                columns: query.columns.clone(),
1440                method: method_kind,
1441                unique: query.unique,
1442            });
1443        self.persist_runtime_index_descriptor(super::index_store::RegisteredIndex {
1444            name: query.name.clone(),
1445            collection: query.table.clone(),
1446            columns: query.columns.clone(),
1447            method: method_kind,
1448            unique: query.unique,
1449        })?;
1450        // Issue #120 — surface the index name + indexed columns in
1451        // the schema-vocabulary so AskPipeline (#121) can resolve
1452        // "the email index" back to its collection.
1453        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1454            collection: query.table.clone(),
1455            index: query.name.clone(),
1456            columns: query.columns.clone(),
1457        });
1458
1459        let method_str = format!("{}", query.method);
1460        let unique_str = if query.unique { "unique " } else { "" };
1461        let cols = query.columns.join(", ");
1462
1463        Ok(RuntimeQueryResult::ok_message(
1464            raw_query.to_string(),
1465            &format!(
1466                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1467                unique_str, query.name, query.table, cols, method_str, indexed_count
1468            ),
1469            "create",
1470        ))
1471    }
1472
1473    /// Execute DROP INDEX
1474    ///
1475    /// Removes an index from a collection.
1476    pub fn execute_drop_index(
1477        &self,
1478        raw_query: &str,
1479        query: &DropIndexQuery,
1480    ) -> RedDBResult<RuntimeQueryResult> {
1481        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1482        let store = self.inner.db.store();
1483
1484        // Verify the table exists
1485        if store.get_collection(&query.table).is_none() {
1486            if query.if_exists {
1487                return Ok(RuntimeQueryResult::ok_message(
1488                    raw_query.to_string(),
1489                    &format!("table '{}' does not exist", query.table),
1490                    "drop",
1491                ));
1492            }
1493            return Err(RedDBError::NotFound(format!(
1494                "table '{}' not found",
1495                query.table
1496            )));
1497        }
1498
1499        // Remove from IndexStore
1500        self.inner.index_store.drop_index(&query.name, &query.table);
1501        self.persist_runtime_index_drop(&query.table, &query.name)?;
1502        self.invalidate_plan_cache();
1503        // Issue #120 — keep the schema-vocabulary index entry in sync.
1504        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1505            collection: query.table.clone(),
1506            index: query.name.clone(),
1507        });
1508
1509        Ok(RuntimeQueryResult::ok_message(
1510            raw_query.to_string(),
1511            &format!("index '{}' dropped from '{}'", query.name, query.table),
1512            "drop",
1513        ))
1514    }
1515
1516    fn execute_drop_typed_collection(
1517        &self,
1518        raw_query: &str,
1519        name: &str,
1520        if_exists: bool,
1521        expected_model: CollectionModel,
1522        label: &str,
1523    ) -> RedDBResult<RuntimeQueryResult> {
1524        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1525        if is_system_schema_name(name) {
1526            return Err(RedDBError::Query("system schema is read-only".to_string()));
1527        }
1528        let store = self.inner.db.store();
1529        if store.get_collection(name).is_none() {
1530            if if_exists {
1531                return Ok(RuntimeQueryResult::ok_message(
1532                    raw_query.to_string(),
1533                    &format!("{label} '{name}' does not exist"),
1534                    "drop",
1535                ));
1536            }
1537            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1538        }
1539
1540        let actual = self
1541            .inner
1542            .db
1543            .collection_contract(name)
1544            .map(|contract| contract.declared_model)
1545            .map(Ok)
1546            .unwrap_or_else(|| {
1547                polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())
1548            })?;
1549        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1550        self.drop_collection_storage(raw_query, name, label)
1551    }
1552
1553    pub fn execute_truncate(
1554        &self,
1555        raw_query: &str,
1556        query: &TruncateQuery,
1557    ) -> RedDBResult<RuntimeQueryResult> {
1558        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1559        if is_system_schema_name(&query.name) {
1560            return Err(RedDBError::Query("system schema is read-only".to_string()));
1561        }
1562
1563        let label = query
1564            .model
1565            .map(polymorphic_resolver::model_name)
1566            .unwrap_or("collection");
1567        let store = self.inner.db.store();
1568        if store.get_collection(&query.name).is_none() {
1569            if query.if_exists {
1570                return Ok(RuntimeQueryResult::ok_message(
1571                    raw_query.to_string(),
1572                    &format!("{label} '{}' does not exist", query.name),
1573                    "truncate",
1574                ));
1575            }
1576            return Err(RedDBError::NotFound(format!(
1577                "{label} '{}' not found",
1578                query.name
1579            )));
1580        }
1581
1582        let actual =
1583            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1584        if let Some(expected) = query.model {
1585            polymorphic_resolver::ensure_model_match(expected, actual)?;
1586        }
1587
1588        if actual == CollectionModel::Queue {
1589            return self.execute_queue_command(
1590                raw_query,
1591                &QueueCommand::Purge {
1592                    queue: query.name.clone(),
1593                },
1594            );
1595        }
1596
1597        // Count before wiping so we can emit the aggregated truncate event.
1598        let affected = self.truncate_collection_entities(&query.name)?;
1599        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1600        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1601        self.inner.db.invalidate_vector_index(&query.name);
1602        self.clear_table_planner_stats(&query.name);
1603        self.invalidate_result_cache();
1604
1605        Ok(RuntimeQueryResult::ok_message(
1606            raw_query.to_string(),
1607            &format!(
1608                "{affected} entities truncated from {label} '{}'",
1609                query.name
1610            ),
1611            "truncate",
1612        ))
1613    }
1614
1615    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1616        let store = self.inner.db.store();
1617        let Some(manager) = store.get_collection(name) else {
1618            return Ok(0);
1619        };
1620        let entities = manager.query_all(|_| true);
1621        if entities.is_empty() {
1622            return Ok(0);
1623        }
1624
1625        for entity in &entities {
1626            let fields = entity_index_fields(&entity.data);
1627            self.inner
1628                .index_store
1629                .index_entity_delete(name, entity.id, &fields)
1630                .map_err(RedDBError::Internal)?;
1631        }
1632
1633        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1634        let deleted_ids = store
1635            .delete_batch(name, &ids)
1636            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1637        for id in &deleted_ids {
1638            store.context_index().remove_entity(*id);
1639        }
1640        Ok(deleted_ids.len() as u64)
1641    }
1642
1643    fn drop_collection_storage(
1644        &self,
1645        raw_query: &str,
1646        name: &str,
1647        label: &str,
1648    ) -> RedDBResult<RuntimeQueryResult> {
1649        let store = self.inner.db.store();
1650
1651        // Emit 1 collection_dropped event before storage is wiped.
1652        // Queue is preserved; subscription is removed with the contract below.
1653        let final_count = store
1654            .get_collection(name)
1655            .map(|manager| manager.query_all(|_| true).len() as u64)
1656            .unwrap_or(0);
1657        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1658            self,
1659            name,
1660            final_count,
1661        )?;
1662
1663        let orphaned_indices: Vec<String> = self
1664            .inner
1665            .index_store
1666            .list_indices(name)
1667            .into_iter()
1668            .map(|index| index.name)
1669            .collect();
1670        for index_name in &orphaned_indices {
1671            self.inner.index_store.drop_index(index_name, name);
1672        }
1673
1674        store
1675            .drop_collection(name)
1676            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1677        self.inner.db.invalidate_vector_index(name);
1678        self.inner.db.clear_collection_default_ttl_ms(name);
1679        self.inner
1680            .db
1681            .remove_collection_contract(name)
1682            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1683        self.clear_table_planner_stats(name);
1684        self.invalidate_result_cache();
1685        if let Some(store) = self.inner.auth_store.read().clone() {
1686            store.invalidate_visible_collections_cache();
1687        }
1688        self.inner
1689            .db
1690            .persist_metadata()
1691            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1692        self.schema_vocabulary_apply(
1693            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1694                collection: name.to_string(),
1695            },
1696        );
1697
1698        Ok(RuntimeQueryResult::ok_message(
1699            raw_query.to_string(),
1700            &format!("{label} '{name}' dropped"),
1701            "drop",
1702        ))
1703    }
1704}
1705
1706pub(crate) fn is_system_schema_name(name: &str) -> bool {
1707    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1708}
1709
1710fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1711    match data {
1712        EntityData::Row(row) => {
1713            if let Some(ref named) = row.named {
1714                named
1715                    .iter()
1716                    .map(|(key, value)| (key.clone(), value.clone()))
1717                    .collect()
1718            } else if let Some(ref schema) = row.schema {
1719                schema
1720                    .iter()
1721                    .zip(row.columns.iter())
1722                    .map(|(key, value)| (key.clone(), value.clone()))
1723                    .collect()
1724            } else {
1725                Vec::new()
1726            }
1727        }
1728        EntityData::Node(node) => node
1729            .properties
1730            .iter()
1731            .map(|(key, value)| (key.clone(), value.clone()))
1732            .collect(),
1733        _ => Vec::new(),
1734    }
1735}
1736
1737/// DDL-time gate for a declared AI policy (issue #1271). Each declared
1738/// modality block is checked against the provider capability matrix
1739/// (#1269): a provider/model that cannot serve the requested modality is
1740/// rejected before the collection is created. Uses the built-in registry
1741/// (no per-deployment overrides at DDL time).
1742fn validate_ai_policy_modalities(policy: &crate::catalog::AiPolicy) -> RedDBResult<()> {
1743    use crate::runtime::ai::provider_capabilities::{Modality, Registry};
1744    let registry = Registry::new();
1745    let check = |provider: &str, model: &str, modality: Modality| -> RedDBResult<()> {
1746        registry
1747            .validate_policy_modality(provider, model, modality)
1748            .map_err(|err| RedDBError::Query(err.to_string()))
1749    };
1750    if let Some(embed) = &policy.embed {
1751        check(&embed.provider, &embed.model, Modality::Embed)?;
1752    }
1753    if let Some(moderate) = &policy.moderate {
1754        check(&moderate.provider, &moderate.model, Modality::Moderate)?;
1755    }
1756    if let Some(vision) = &policy.vision {
1757        check(&vision.provider, &vision.model, Modality::Vision)?;
1758    }
1759    Ok(())
1760}
1761
1762fn collection_contract_from_create_table(
1763    query: &CreateTableQuery,
1764) -> RedDBResult<crate::physical::CollectionContract> {
1765    let now = current_unix_ms();
1766    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1767        .columns
1768        .iter()
1769        .map(declared_column_contract_from_ddl)
1770        .collect();
1771    if query.timestamps {
1772        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1773        // columns that the write path populates from
1774        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1775        declared_columns.push(crate::physical::DeclaredColumnContract {
1776            name: "created_at".to_string(),
1777            data_type: "BIGINT".to_string(),
1778            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1779            not_null: true,
1780            default: None,
1781            compress: None,
1782            unique: false,
1783            primary_key: false,
1784            enum_variants: Vec::new(),
1785            array_element: None,
1786            decimal_precision: None,
1787        });
1788        declared_columns.push(crate::physical::DeclaredColumnContract {
1789            name: "updated_at".to_string(),
1790            data_type: "BIGINT".to_string(),
1791            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1792            not_null: true,
1793            default: None,
1794            compress: None,
1795            unique: false,
1796            primary_key: false,
1797            enum_variants: Vec::new(),
1798            array_element: None,
1799            decimal_precision: None,
1800        });
1801    }
1802    Ok(crate::physical::CollectionContract {
1803        name: query.name.clone(),
1804        declared_model: crate::catalog::CollectionModel::Table,
1805        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1806        origin: crate::physical::ContractOrigin::Explicit,
1807        version: 1,
1808        created_at_unix_ms: now,
1809        updated_at_unix_ms: now,
1810        default_ttl_ms: query.default_ttl_ms,
1811        vector_dimension: None,
1812        vector_metric: None,
1813        context_index_fields: query.context_index_fields.clone(),
1814        declared_columns,
1815        table_def: Some(build_table_def_from_create_table(query)?),
1816        timestamps_enabled: query.timestamps,
1817        context_index_enabled: query.context_index_enabled
1818            || !query.context_index_fields.is_empty(),
1819        metrics_raw_retention_ms: None,
1820        metrics_rollup_policies: Vec::new(),
1821        metrics_tenant_identity: None,
1822        metrics_namespace: None,
1823        append_only: query.append_only,
1824        subscriptions: query.subscriptions.clone(),
1825        analytics_config: Vec::new(),
1826        session_key: None,
1827        session_gap_ms: None,
1828        retention_duration_ms: None,
1829        analytical_storage: None,
1830        ai_policy: query.ai_policy.clone(),
1831    })
1832}
1833
1834fn default_collection_contract_for_existing_table(
1835    name: &str,
1836) -> crate::physical::CollectionContract {
1837    let now = current_unix_ms();
1838    crate::physical::CollectionContract {
1839        name: name.to_string(),
1840        declared_model: crate::catalog::CollectionModel::Table,
1841        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1842        origin: crate::physical::ContractOrigin::Explicit,
1843        version: 0,
1844        created_at_unix_ms: now,
1845        updated_at_unix_ms: now,
1846        default_ttl_ms: None,
1847        vector_dimension: None,
1848        vector_metric: None,
1849        context_index_fields: Vec::new(),
1850        declared_columns: Vec::new(),
1851        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1852        timestamps_enabled: false,
1853        context_index_enabled: false,
1854        metrics_raw_retention_ms: None,
1855        metrics_rollup_policies: Vec::new(),
1856        metrics_tenant_identity: None,
1857        metrics_namespace: None,
1858        append_only: false,
1859        subscriptions: Vec::new(),
1860        analytics_config: Vec::new(),
1861        session_key: None,
1862        session_gap_ms: None,
1863        retention_duration_ms: None,
1864        analytical_storage: None,
1865
1866        ai_policy: None,
1867    }
1868}
1869
1870fn keyed_collection_contract(
1871    name: &str,
1872    model: crate::catalog::CollectionModel,
1873    analytics_config: Vec<crate::catalog::AnalyticsViewDescriptor>,
1874) -> crate::physical::CollectionContract {
1875    let now = current_unix_ms();
1876    crate::physical::CollectionContract {
1877        name: name.to_string(),
1878        declared_model: model,
1879        schema_mode: crate::catalog::SchemaMode::Dynamic,
1880        origin: crate::physical::ContractOrigin::Explicit,
1881        version: 1,
1882        created_at_unix_ms: now,
1883        updated_at_unix_ms: now,
1884        default_ttl_ms: None,
1885        vector_dimension: None,
1886        vector_metric: None,
1887        context_index_fields: Vec::new(),
1888        declared_columns: Vec::new(),
1889        table_def: None,
1890        timestamps_enabled: false,
1891        context_index_enabled: false,
1892        metrics_raw_retention_ms: None,
1893        metrics_rollup_policies: Vec::new(),
1894        metrics_tenant_identity: None,
1895        metrics_namespace: None,
1896        append_only: false,
1897        subscriptions: Vec::new(),
1898        analytics_config,
1899        session_key: None,
1900        session_gap_ms: None,
1901        retention_duration_ms: None,
1902        analytical_storage: None,
1903
1904        ai_policy: None,
1905    }
1906}
1907
1908fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1909    let now = current_unix_ms();
1910    crate::physical::CollectionContract {
1911        name: query.name.clone(),
1912        declared_model: crate::catalog::CollectionModel::Metrics,
1913        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1914        origin: crate::physical::ContractOrigin::Explicit,
1915        version: 1,
1916        created_at_unix_ms: now,
1917        updated_at_unix_ms: now,
1918        default_ttl_ms: query.default_ttl_ms,
1919        vector_dimension: None,
1920        vector_metric: None,
1921        context_index_fields: Vec::new(),
1922        declared_columns: Vec::new(),
1923        table_def: None,
1924        timestamps_enabled: false,
1925        context_index_enabled: false,
1926        metrics_raw_retention_ms: query.default_ttl_ms,
1927        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1928        metrics_tenant_identity: Some(
1929            query
1930                .tenant_by
1931                .clone()
1932                .unwrap_or_else(|| "current_tenant".to_string()),
1933        ),
1934        metrics_namespace: Some("default".to_string()),
1935        append_only: true,
1936        subscriptions: Vec::new(),
1937        analytics_config: Vec::new(),
1938        session_key: None,
1939        session_gap_ms: None,
1940        retention_duration_ms: None,
1941        analytical_storage: None,
1942
1943        ai_policy: None,
1944    }
1945}
1946
1947fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1948    let now = current_unix_ms();
1949    crate::physical::CollectionContract {
1950        name: query.name.clone(),
1951        declared_model: crate::catalog::CollectionModel::Vector,
1952        schema_mode: crate::catalog::SchemaMode::Dynamic,
1953        origin: crate::physical::ContractOrigin::Explicit,
1954        version: 1,
1955        created_at_unix_ms: now,
1956        updated_at_unix_ms: now,
1957        default_ttl_ms: None,
1958        vector_dimension: Some(query.dimension),
1959        vector_metric: Some(query.metric),
1960        context_index_fields: Vec::new(),
1961        declared_columns: Vec::new(),
1962        table_def: None,
1963        timestamps_enabled: false,
1964        context_index_enabled: false,
1965        metrics_raw_retention_ms: None,
1966        metrics_rollup_policies: Vec::new(),
1967        metrics_tenant_identity: None,
1968        metrics_namespace: None,
1969        append_only: false,
1970        subscriptions: Vec::new(),
1971        analytics_config: Vec::new(),
1972        session_key: None,
1973        session_gap_ms: None,
1974        retention_duration_ms: None,
1975        analytical_storage: None,
1976
1977        ai_policy: None,
1978    }
1979}
1980
1981fn declared_column_contract_from_ddl(
1982    column: &CreateColumnDef,
1983) -> crate::physical::DeclaredColumnContract {
1984    crate::physical::DeclaredColumnContract {
1985        name: column.name.clone(),
1986        data_type: column.data_type.clone(),
1987        sql_type: Some(column.sql_type.clone()),
1988        not_null: column.not_null,
1989        default: column.default.clone(),
1990        compress: column.compress,
1991        unique: column.unique,
1992        primary_key: column.primary_key,
1993        enum_variants: column.enum_variants.clone(),
1994        array_element: column.array_element.clone(),
1995        decimal_precision: column.decimal_precision,
1996    }
1997}
1998
1999fn apply_alter_operations_to_contract(
2000    contract: &mut crate::physical::CollectionContract,
2001    operations: &[AlterOperation],
2002) {
2003    if contract.table_def.is_none() {
2004        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
2005    }
2006    for operation in operations {
2007        match operation {
2008            AlterOperation::AddColumn(column) => {
2009                if !contract
2010                    .declared_columns
2011                    .iter()
2012                    .any(|existing| existing.name == column.name)
2013                {
2014                    contract
2015                        .declared_columns
2016                        .push(declared_column_contract_from_ddl(column));
2017                }
2018                if let Some(table_def) = contract.table_def.as_mut() {
2019                    if table_def.get_column(&column.name).is_none() {
2020                        if let Ok(column_def) = column_def_from_ddl(column) {
2021                            if column.primary_key {
2022                                table_def.primary_key.push(column.name.clone());
2023                                table_def.constraints.push(
2024                                    crate::storage::schema::Constraint::new(
2025                                        format!("pk_{}", column.name),
2026                                        crate::storage::schema::ConstraintType::PrimaryKey,
2027                                    )
2028                                    .on_columns(vec![column.name.clone()]),
2029                                );
2030                            }
2031                            if column.unique {
2032                                table_def.constraints.push(
2033                                    crate::storage::schema::Constraint::new(
2034                                        format!("uniq_{}", column.name),
2035                                        crate::storage::schema::ConstraintType::Unique,
2036                                    )
2037                                    .on_columns(vec![column.name.clone()]),
2038                                );
2039                            }
2040                            if column.not_null {
2041                                table_def.constraints.push(
2042                                    crate::storage::schema::Constraint::new(
2043                                        format!("not_null_{}", column.name),
2044                                        crate::storage::schema::ConstraintType::NotNull,
2045                                    )
2046                                    .on_columns(vec![column.name.clone()]),
2047                                );
2048                            }
2049                            table_def.columns.push(column_def);
2050                        }
2051                    }
2052                }
2053            }
2054            AlterOperation::DropColumn(name) => {
2055                contract
2056                    .declared_columns
2057                    .retain(|column| column.name != *name);
2058                if let Some(table_def) = contract.table_def.as_mut() {
2059                    if let Some(index) = table_def.column_index(name) {
2060                        table_def.columns.remove(index);
2061                    }
2062                    table_def.primary_key.retain(|column| column != name);
2063                    table_def.constraints.retain(|constraint| {
2064                        !constraint.columns.iter().any(|column| column == name)
2065                    });
2066                    table_def
2067                        .indexes
2068                        .retain(|index| !index.columns.iter().any(|column| column == name));
2069                }
2070            }
2071            AlterOperation::RenameColumn { from, to } => {
2072                if contract
2073                    .declared_columns
2074                    .iter()
2075                    .any(|column| column.name == *to)
2076                {
2077                    continue;
2078                }
2079                if let Some(column) = contract
2080                    .declared_columns
2081                    .iter_mut()
2082                    .find(|column| column.name == *from)
2083                {
2084                    column.name = to.clone();
2085                }
2086                if let Some(table_def) = contract.table_def.as_mut() {
2087                    if let Some(column) = table_def
2088                        .columns
2089                        .iter_mut()
2090                        .find(|column| column.name == *from)
2091                    {
2092                        column.name = to.clone();
2093                    }
2094                    for primary_key in &mut table_def.primary_key {
2095                        if *primary_key == *from {
2096                            *primary_key = to.clone();
2097                        }
2098                    }
2099                    for constraint in &mut table_def.constraints {
2100                        for column in &mut constraint.columns {
2101                            if *column == *from {
2102                                *column = to.clone();
2103                            }
2104                        }
2105                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
2106                            for column in ref_columns {
2107                                if *column == *from {
2108                                    *column = to.clone();
2109                                }
2110                            }
2111                        }
2112                    }
2113                    for index in &mut table_def.indexes {
2114                        for column in &mut index.columns {
2115                            if *column == *from {
2116                                *column = to.clone();
2117                            }
2118                        }
2119                    }
2120                }
2121            }
2122            // Partition ops don't touch the column contract — metadata is
2123            // persisted separately via `red_config.partition.*`.
2124            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
2125            // RLS toggles don't touch the column contract — flag is persisted
2126            // separately via `red_config.rls.enabled.{table}` and enforced
2127            // through the in-memory `rls_enabled_tables` set.
2128            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
2129            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
2130            // and are enforced through `tenant_tables` + RLS auto-policy.
2131            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
2132            AlterOperation::SetAppendOnly(on) => {
2133                contract.append_only = *on;
2134            }
2135            // VCS opt-in is persisted to red_vcs_settings by the
2136            // executor, not the contract — nothing to do here.
2137            AlterOperation::SetVersioned(_) => {}
2138            AlterOperation::EnableEvents(subscription) => {
2139                let mut subscription = subscription.clone();
2140                subscription.source = contract.name.clone();
2141                subscription.enabled = true;
2142                if let Some(existing) = contract
2143                    .subscriptions
2144                    .iter_mut()
2145                    .find(|existing| existing.target_queue == subscription.target_queue)
2146                {
2147                    *existing = subscription;
2148                } else {
2149                    contract.subscriptions.push(subscription);
2150                }
2151            }
2152            AlterOperation::DisableEvents => {
2153                for subscription in &mut contract.subscriptions {
2154                    subscription.enabled = false;
2155                }
2156            }
2157            AlterOperation::AddSubscription { name, descriptor } => {
2158                let mut sub = descriptor.clone();
2159                sub.name = name.clone();
2160                sub.source = contract.name.clone();
2161                sub.enabled = true;
2162                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
2163                {
2164                    *existing = sub;
2165                } else {
2166                    contract.subscriptions.push(sub);
2167                }
2168            }
2169            AlterOperation::DropSubscription { name } => {
2170                contract.subscriptions.retain(|s| s.name != *name);
2171            }
2172            // Signer registry mutations live in `red_config` outside the
2173            // contract surface — the executor applied them directly via
2174            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
2175            // into the column-shaped contract.
2176            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
2177            AlterOperation::SetRetention { duration_ms } => {
2178                contract.retention_duration_ms = Some(*duration_ms);
2179            }
2180            AlterOperation::UnsetRetention => {
2181                contract.retention_duration_ms = None;
2182            }
2183            // Issue #801 — fold analytics lifecycle into the WAL-backed
2184            // `analytics_config` so the change is durable and the
2185            // `<graph>.<output>` resolver picks it up on the next read.
2186            AlterOperation::AddAnalytics(views) => {
2187                for view in views {
2188                    // Idempotent: skip outputs already enabled so a repeated
2189                    // ADD never duplicates state. The first declaration wins —
2190                    // re-declaring options requires DROP then ADD.
2191                    if !contract
2192                        .analytics_config
2193                        .iter()
2194                        .any(|existing| existing.output == view.output)
2195                    {
2196                        contract.analytics_config.push(view.clone());
2197                    }
2198                }
2199            }
2200            AlterOperation::DropAnalytics(output) => {
2201                contract
2202                    .analytics_config
2203                    .retain(|view| view.output != *output);
2204            }
2205        }
2206    }
2207}
2208
2209/// Issue #580 — returns true if the contract carries at least one
2210/// column the retention filter can use as a timestamp anchor:
2211/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
2212/// or a user-declared column with a temporal data_type.
2213pub(crate) fn retention_timestamp_column_exists(
2214    contract: &crate::physical::CollectionContract,
2215) -> bool {
2216    if contract.timestamps_enabled {
2217        return true;
2218    }
2219    if matches!(
2220        contract.declared_model,
2221        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
2222    ) {
2223        // Time-series and metrics collections carry an intrinsic
2224        // timestamp axis on every row even without a declared column;
2225        // retention has a natural anchor.
2226        return true;
2227    }
2228    contract
2229        .declared_columns
2230        .iter()
2231        .any(|column| is_temporal_data_type(&column.data_type))
2232}
2233
2234fn is_temporal_data_type(data_type: &str) -> bool {
2235    let upper = data_type.to_ascii_uppercase();
2236    matches!(
2237        upper.as_str(),
2238        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2239    )
2240}
2241
2242fn validate_event_subscriptions(
2243    runtime: &RedDBRuntime,
2244    source: &str,
2245    subscriptions: &[crate::catalog::SubscriptionDescriptor],
2246) -> RedDBResult<()> {
2247    for subscription in subscriptions
2248        .iter()
2249        .filter(|subscription| subscription.enabled)
2250    {
2251        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2252            return Err(RedDBError::Query(
2253                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2254            ));
2255        }
2256        validate_subscription_auth(runtime, source, subscription)?;
2257        if subscription.target_queue == source
2258            || subscription_would_create_cycle(
2259                &runtime.inner.db,
2260                source,
2261                &subscription.target_queue,
2262            )
2263        {
2264            return Err(RedDBError::Query(
2265                "subscription would create cycle".to_string(),
2266            ));
2267        }
2268        audit_subscription_redact_gap(runtime, source, subscription);
2269    }
2270    Ok(())
2271}
2272
2273fn validate_subscription_auth(
2274    runtime: &RedDBRuntime,
2275    source: &str,
2276    subscription: &crate::catalog::SubscriptionDescriptor,
2277) -> RedDBResult<()> {
2278    let auth_store = match runtime.inner.auth_store.read().clone() {
2279        Some(store) => store,
2280        None => return Ok(()),
2281    };
2282    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2283        Some(identity) => identity,
2284        None => return Ok(()),
2285    };
2286    let tenant = crate::runtime::impl_core::current_tenant();
2287    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2288
2289    if auth_store.iam_authorization_enabled() {
2290        let ctx = crate::auth::policies::EvalContext {
2291            principal_tenant: tenant.clone(),
2292            current_tenant: tenant.clone(),
2293            peer_ip: None,
2294            mfa_present: false,
2295            now_ms: crate::auth::now_ms(),
2296            principal_is_admin_role: role == crate::auth::Role::Admin,
2297            principal_is_platform_scoped: principal.tenant.is_none(),
2298        };
2299        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2300        if let Some(t) = tenant.as_deref() {
2301            source_resource = source_resource.with_tenant(t.to_string());
2302        }
2303        if !auth_store.check_policy_authz_with_role(
2304            &principal,
2305            "select",
2306            &source_resource,
2307            &ctx,
2308            role,
2309        ) {
2310            return Err(RedDBError::Query(format!(
2311                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2312                principal, source_resource.kind, source_resource.name
2313            )));
2314        }
2315
2316        let mut target_resource =
2317            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2318        if let Some(t) = tenant.as_deref() {
2319            target_resource = target_resource.with_tenant(t.to_string());
2320        }
2321        if !auth_store.check_policy_authz_with_role(
2322            &principal,
2323            "write",
2324            &target_resource,
2325            &ctx,
2326            role,
2327        ) {
2328            return Err(RedDBError::Query(format!(
2329                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2330                principal, target_resource.kind, target_resource.name
2331            )));
2332        }
2333        return Ok(());
2334    }
2335
2336    let ctx = crate::auth::privileges::AuthzContext {
2337        principal: &username,
2338        effective_role: role,
2339        tenant: tenant.as_deref(),
2340    };
2341    auth_store
2342        .check_grant(
2343            &ctx,
2344            crate::auth::privileges::Action::Select,
2345            &crate::auth::privileges::Resource::table_from_name(source),
2346        )
2347        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2348    auth_store
2349        .check_grant(
2350            &ctx,
2351            crate::auth::privileges::Action::Insert,
2352            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2353        )
2354        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2355    Ok(())
2356}
2357
2358fn audit_subscription_redact_gap(
2359    runtime: &RedDBRuntime,
2360    source: &str,
2361    subscription: &crate::catalog::SubscriptionDescriptor,
2362) {
2363    let auth_store = match runtime.inner.auth_store.read().clone() {
2364        Some(store) if store.iam_authorization_enabled() => store,
2365        _ => return,
2366    };
2367    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2368        Some(identity) => identity,
2369        None => return,
2370    };
2371    let tenant = crate::runtime::impl_core::current_tenant();
2372    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2373    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2374    if missing.is_empty() {
2375        return;
2376    }
2377
2378    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2379    tracing::warn!(
2380        target: "reddb::operator",
2381        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2382        source,
2383        subscription.target_queue,
2384        columns
2385    );
2386    let mut event = AuditEvent::builder("subscription_redact_gap")
2387        .principal(username)
2388        .source(AuditAuthSource::System)
2389        .resource(format!(
2390            "subscription:{}->{}",
2391            source, subscription.target_queue
2392        ))
2393        .outcome(Outcome::Success)
2394        .field(AuditFieldEscaper::field("source", source))
2395        .field(AuditFieldEscaper::field(
2396            "target_queue",
2397            subscription.target_queue.clone(),
2398        ))
2399        .field(AuditFieldEscaper::field(
2400            "subscription",
2401            subscription.name.clone(),
2402        ))
2403        .field(AuditFieldEscaper::field("columns", columns))
2404        .field(AuditFieldEscaper::field("role", role.as_str()));
2405    if let Some(t) = tenant {
2406        event = event.tenant(t);
2407    }
2408    runtime.inner.audit_log.record_event(event.build());
2409}
2410
2411fn subscription_redact_gap_columns(
2412    auth_store: &crate::auth::store::AuthStore,
2413    principal: &crate::auth::UserId,
2414    source: &str,
2415    subscription: &crate::catalog::SubscriptionDescriptor,
2416) -> BTreeSet<String> {
2417    let redacted: HashSet<String> = subscription
2418        .redact_fields
2419        .iter()
2420        .map(|field| field.to_ascii_lowercase())
2421        .collect();
2422    auth_store
2423        .effective_policies(principal)
2424        .iter()
2425        .flat_map(|policy| policy.statements.iter())
2426        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2427        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2428        .flat_map(|statement| statement.resources.iter())
2429        .filter_map(|resource| denied_column_for_source(resource, source))
2430        .filter(|column| !redact_covers_column(&redacted, source, column))
2431        .collect()
2432}
2433
2434fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2435    match pattern {
2436        crate::auth::policies::ActionPattern::Wildcard => true,
2437        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2438        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2439            "select".len() > prefix.len() + 1
2440                && "select".starts_with(prefix)
2441                && "select".as_bytes()[prefix.len()] == b':'
2442        }
2443    }
2444}
2445
2446fn denied_column_for_source(
2447    resource: &crate::auth::policies::ResourcePattern,
2448    source: &str,
2449) -> Option<String> {
2450    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2451        return None;
2452    };
2453    if kind != "column" {
2454        return None;
2455    }
2456    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2457    (column.table_resource_name() == source).then_some(column.column)
2458}
2459
2460fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2461    let column = column.to_ascii_lowercase();
2462    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2463    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2464}
2465
2466fn subscription_would_create_cycle(
2467    db: &crate::storage::unified::devx::RedDB,
2468    source: &str,
2469    target: &str,
2470) -> bool {
2471    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2472    for contract in db.collection_contracts() {
2473        for subscription in contract
2474            .subscriptions
2475            .into_iter()
2476            .filter(|subscription| subscription.enabled)
2477        {
2478            graph
2479                .entry(subscription.source)
2480                .or_default()
2481                .push(subscription.target_queue);
2482        }
2483    }
2484    graph
2485        .entry(source.to_string())
2486        .or_default()
2487        .push(target.to_string());
2488
2489    let mut stack = vec![target.to_string()];
2490    let mut seen = HashSet::new();
2491    while let Some(node) = stack.pop() {
2492        if node == source {
2493            return true;
2494        }
2495        if !seen.insert(node.clone()) {
2496            continue;
2497        }
2498        if let Some(next) = graph.get(&node) {
2499            stack.extend(next.iter().cloned());
2500        }
2501    }
2502    false
2503}
2504
2505pub(crate) fn ensure_event_target_queue_pub(
2506    runtime: &RedDBRuntime,
2507    queue: &str,
2508) -> RedDBResult<()> {
2509    ensure_event_target_queue(runtime, queue)
2510}
2511
2512fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2513    let store = runtime.inner.db.store();
2514    if store.get_collection(queue).is_some() {
2515        return Ok(());
2516    }
2517    store
2518        .create_collection(queue)
2519        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2520    runtime
2521        .inner
2522        .db
2523        .save_collection_contract(event_queue_collection_contract(queue))
2524        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2525    store.set_config_tree(
2526        &format!("queue.{queue}.mode"),
2527        &crate::serde_json::Value::String("fanout".to_string()),
2528    );
2529    Ok(())
2530}
2531
2532fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2533    let now = current_unix_ms();
2534    crate::physical::CollectionContract {
2535        name: queue.to_string(),
2536        declared_model: crate::catalog::CollectionModel::Queue,
2537        schema_mode: crate::catalog::SchemaMode::Dynamic,
2538        origin: crate::physical::ContractOrigin::Implicit,
2539        version: 1,
2540        created_at_unix_ms: now,
2541        updated_at_unix_ms: now,
2542        default_ttl_ms: None,
2543        vector_dimension: None,
2544        vector_metric: None,
2545        context_index_fields: Vec::new(),
2546        declared_columns: Vec::new(),
2547        table_def: None,
2548        timestamps_enabled: false,
2549        context_index_enabled: false,
2550        metrics_raw_retention_ms: None,
2551        metrics_rollup_policies: Vec::new(),
2552        metrics_tenant_identity: None,
2553        metrics_namespace: None,
2554        append_only: true,
2555        subscriptions: Vec::new(),
2556        analytics_config: Vec::new(),
2557        session_key: None,
2558        session_gap_ms: None,
2559        retention_duration_ms: None,
2560        analytical_storage: None,
2561
2562        ai_policy: None,
2563    }
2564}
2565
2566fn build_table_def_from_create_table(
2567    query: &CreateTableQuery,
2568) -> RedDBResult<crate::storage::schema::TableDef> {
2569    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2570    for column in &query.columns {
2571        if column.primary_key {
2572            table.primary_key.push(column.name.clone());
2573            table.constraints.push(
2574                crate::storage::schema::Constraint::new(
2575                    format!("pk_{}", column.name),
2576                    crate::storage::schema::ConstraintType::PrimaryKey,
2577                )
2578                .on_columns(vec![column.name.clone()]),
2579            );
2580        }
2581        if column.unique {
2582            table.constraints.push(
2583                crate::storage::schema::Constraint::new(
2584                    format!("uniq_{}", column.name),
2585                    crate::storage::schema::ConstraintType::Unique,
2586                )
2587                .on_columns(vec![column.name.clone()]),
2588            );
2589        }
2590        if column.not_null {
2591            table.constraints.push(
2592                crate::storage::schema::Constraint::new(
2593                    format!("not_null_{}", column.name),
2594                    crate::storage::schema::ConstraintType::NotNull,
2595                )
2596                .on_columns(vec![column.name.clone()]),
2597            );
2598        }
2599        table.columns.push(column_def_from_ddl(column)?);
2600    }
2601    // WITH timestamps = true: append the two runtime-managed columns
2602    // to the schema so resolved_contract_columns exposes them to the
2603    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2604    // not-nullable; the write path auto-fills them.
2605    if query.timestamps {
2606        table.columns.push(
2607            crate::storage::schema::ColumnDef::new(
2608                "created_at".to_string(),
2609                crate::storage::schema::DataType::UnsignedInteger,
2610            )
2611            .not_null(),
2612        );
2613        table.columns.push(
2614            crate::storage::schema::ColumnDef::new(
2615                "updated_at".to_string(),
2616                crate::storage::schema::DataType::UnsignedInteger,
2617            )
2618            .not_null(),
2619        );
2620        table.constraints.push(
2621            crate::storage::schema::Constraint::new(
2622                "not_null_created_at".to_string(),
2623                crate::storage::schema::ConstraintType::NotNull,
2624            )
2625            .on_columns(vec!["created_at".to_string()]),
2626        );
2627        table.constraints.push(
2628            crate::storage::schema::Constraint::new(
2629                "not_null_updated_at".to_string(),
2630                crate::storage::schema::ConstraintType::NotNull,
2631            )
2632            .on_columns(vec!["updated_at".to_string()]),
2633        );
2634    }
2635    table
2636        .validate()
2637        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2638    Ok(table)
2639}
2640
2641fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2642    let data_type = resolve_declared_data_type(&column.data_type)
2643        .map_err(|err| RedDBError::Query(err.to_string()))?;
2644    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2645    if column.not_null {
2646        column_def = column_def.not_null();
2647    }
2648    if let Some(default) = &column.default {
2649        column_def = column_def.with_default(default.as_bytes().to_vec());
2650    }
2651    if column.compress.unwrap_or(0) > 0 {
2652        column_def = column_def.compressed();
2653    }
2654    if !column.enum_variants.is_empty() {
2655        column_def = column_def.with_variants(column.enum_variants.clone());
2656    }
2657    if let Some(precision) = column.decimal_precision {
2658        column_def = column_def.with_precision(precision);
2659    }
2660    if let Some(element_type) = &column.array_element {
2661        column_def = column_def.with_element_type(
2662            resolve_declared_data_type(element_type)
2663                .map_err(|err| RedDBError::Query(err.to_string()))?,
2664        );
2665    }
2666    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2667    if column.unique {
2668        column_def = column_def.with_metadata("unique", "true");
2669    }
2670    if column.primary_key {
2671        column_def = column_def.with_metadata("primary_key", "true");
2672    }
2673    Ok(column_def)
2674}
2675
2676fn current_unix_ms() -> u128 {
2677    std::time::SystemTime::now()
2678        .duration_since(std::time::UNIX_EPOCH)
2679        .unwrap_or_default()
2680        .as_millis()
2681}
2682
2683#[cfg(test)]
2684mod tests {
2685    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2686    use crate::auth::store::{AuthStore, PrincipalRef};
2687    use crate::auth::UserId;
2688    use crate::auth::{AuthConfig, Role};
2689    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2690    use crate::storage::schema::Value;
2691    use crate::{RedDBOptions, RedDBRuntime};
2692    use std::sync::Arc;
2693
2694    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2695        Policy {
2696            id: id.to_string(),
2697            version: 1,
2698            tenant: None,
2699            created_at: 0,
2700            updated_at: 0,
2701            statements: vec![Statement {
2702                sid: None,
2703                effect: Effect::Allow,
2704                actions: vec![ActionPattern::Exact(action.to_string())],
2705                resources: vec![ResourcePattern::Exact {
2706                    kind: "collection".to_string(),
2707                    name: collection.to_string(),
2708                }],
2709                condition: None,
2710            }],
2711        }
2712    }
2713
2714    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2715        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2716        *rt.inner.auth_store.write() = Some(store.clone());
2717        store
2718    }
2719
2720    #[test]
2721    fn drop_denied_without_iam_policy() {
2722        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2723        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2724        let store = wire_auth_store(&rt);
2725        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2726        let select_only = Policy {
2727            id: "select-only".to_string(),
2728            version: 1,
2729            tenant: None,
2730            created_at: 0,
2731            updated_at: 0,
2732            statements: vec![Statement {
2733                sid: None,
2734                effect: Effect::Allow,
2735                actions: vec![ActionPattern::Exact("select".to_string())],
2736                resources: vec![ResourcePattern::Wildcard],
2737                condition: None,
2738            }],
2739        };
2740        store.put_policy_internal(select_only).unwrap();
2741        let alice = UserId::from_parts(None, "alice");
2742        store
2743            .attach_policy(PrincipalRef::User(alice), "select-only")
2744            .unwrap();
2745        set_current_auth_identity("alice".to_string(), Role::Write);
2746        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2747        clear_current_auth_identity();
2748        assert!(
2749            format!("{err}").contains("denied by IAM policy"),
2750            "got: {err}"
2751        );
2752    }
2753
2754    #[test]
2755    fn drop_allowed_with_explicit_iam_policy() {
2756        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2757        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2758        let store = wire_auth_store(&rt);
2759        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2760        store.put_policy_internal(policy).unwrap();
2761        let bob = UserId::from_parts(None, "bob");
2762        store
2763            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2764            .unwrap();
2765        set_current_auth_identity("bob".to_string(), Role::Write);
2766        rt.execute_query("DROP TABLE bar").unwrap();
2767        clear_current_auth_identity();
2768    }
2769
2770    #[test]
2771    fn drop_allowed_with_wildcard_iam_policy() {
2772        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2773        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2774        let store = wire_auth_store(&rt);
2775        let policy = Policy {
2776            id: "allow-drop-all".to_string(),
2777            version: 1,
2778            tenant: None,
2779            created_at: 0,
2780            updated_at: 0,
2781            statements: vec![Statement {
2782                sid: None,
2783                effect: Effect::Allow,
2784                actions: vec![ActionPattern::Exact("drop".to_string())],
2785                resources: vec![ResourcePattern::Wildcard],
2786                condition: None,
2787            }],
2788        };
2789        store.put_policy_internal(policy).unwrap();
2790        let carl = UserId::from_parts(None, "carl");
2791        store
2792            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2793            .unwrap();
2794        set_current_auth_identity("carl".to_string(), Role::Write);
2795        rt.execute_query("DROP TABLE baz").unwrap();
2796        clear_current_auth_identity();
2797    }
2798
2799    #[test]
2800    fn truncate_denied_without_iam_policy() {
2801        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2802        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2803        let store = wire_auth_store(&rt);
2804        // Acceptance #2 (#712 / S5A): in `policy_only` mode a
2805        // principal with no matching policy is denied even if their
2806        // role would have permitted the action. The pre-#712 default
2807        // of "no policy → deny" only holds under `policy_only`, so
2808        // pin the mode explicitly here so the assertion holds
2809        // regardless of the construction-time default.
2810        store
2811            .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2812        // A policy exists (IAM active) but gives no truncate right.
2813        let select_only = Policy {
2814            id: "select-only-2".to_string(),
2815            version: 1,
2816            tenant: None,
2817            created_at: 0,
2818            updated_at: 0,
2819            statements: vec![Statement {
2820                sid: None,
2821                effect: Effect::Allow,
2822                actions: vec![ActionPattern::Exact("select".to_string())],
2823                resources: vec![ResourcePattern::Wildcard],
2824                condition: None,
2825            }],
2826        };
2827        store.put_policy_internal(select_only).unwrap();
2828        let dana = UserId::from_parts(None, "dana");
2829        store
2830            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2831            .unwrap();
2832        set_current_auth_identity("dana".to_string(), Role::Write);
2833        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2834        clear_current_auth_identity();
2835        assert!(
2836            format!("{err}").contains("denied by IAM policy"),
2837            "got: {err}"
2838        );
2839    }
2840
2841    #[test]
2842    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2843        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2844        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2845            .unwrap();
2846        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2847            .unwrap();
2848        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2849            .unwrap();
2850
2851        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2852        assert_eq!(truncated.statement_type, "truncate");
2853        assert_eq!(truncated.affected_rows, 0);
2854
2855        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2856        assert!(empty.result.records.is_empty());
2857
2858        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2859            .unwrap();
2860        let selected = rt
2861            .execute_query("SELECT name FROM users WHERE id = 3")
2862            .unwrap();
2863        let name = selected.result.records[0].get("name").unwrap();
2864        assert_eq!(name, &Value::text("cy"));
2865        assert!(rt.db().collection_contract("users").is_some());
2866        assert!(rt
2867            .inner
2868            .index_store
2869            .list_indices("users")
2870            .iter()
2871            .any(|index| index.name == "idx_users_id"));
2872    }
2873
2874    #[test]
2875    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2876        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2877        rt.execute_query("CREATE QUEUE tasks").unwrap();
2878        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2879
2880        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2881        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2882
2883        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2884        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2885        assert_eq!(
2886            len.result.records[0].get("len"),
2887            Some(&Value::UnsignedInteger(0))
2888        );
2889    }
2890
2891    #[test]
2892    fn truncate_system_schema_is_read_only() {
2893        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2894        let err = rt
2895            .execute_query("TRUNCATE COLLECTION red.collections")
2896            .unwrap_err();
2897        assert!(format!("{err}").contains("system schema is read-only"));
2898    }
2899
2900    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2901
2902    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2903        let result = rt
2904            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2905            .expect("peek queue");
2906        result
2907            .result
2908            .records
2909            .iter()
2910            .map(
2911                |record| match record.get("payload").expect("payload column") {
2912                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2913                    other => panic!("expected JSON queue payload, got {other:?}"),
2914                },
2915            )
2916            .collect()
2917    }
2918
2919    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2920    /// `truncate` event, not one delete event per row.
2921    #[test]
2922    fn truncate_event_enabled_table_emits_single_truncate_event() {
2923        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2924        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2925            .unwrap();
2926        rt.execute_query(
2927            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2928        )
2929        .unwrap();
2930
2931        // Drain the 3 insert events so we start clean.
2932        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2933
2934        rt.execute_query("TRUNCATE TABLE users").unwrap();
2935
2936        let events = queue_payloads(&rt, "users_events");
2937        // Must be exactly 1 truncate event, not 3 delete events.
2938        assert_eq!(
2939            events.len(),
2940            1,
2941            "expected 1 truncate event, got {}",
2942            events.len()
2943        );
2944        let ev = events[0].as_object().expect("event is object");
2945        assert_eq!(
2946            ev.get("op").and_then(crate::json::Value::as_str),
2947            Some("truncate")
2948        );
2949        assert_eq!(
2950            ev.get("collection").and_then(crate::json::Value::as_str),
2951            Some("users")
2952        );
2953        assert_eq!(
2954            ev.get("entities_count")
2955                .and_then(crate::json::Value::as_u64),
2956            Some(3)
2957        );
2958        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2959        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2960        assert!(ev
2961            .get("event_id")
2962            .and_then(crate::json::Value::as_str)
2963            .is_some_and(|s| !s.is_empty()));
2964    }
2965
2966    /// `TRUNCATE users` on a collection without event subscription emits no events.
2967    #[test]
2968    fn truncate_no_events_collection_emits_nothing() {
2969        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2970        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2971            .unwrap();
2972        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2973            .unwrap();
2974        // No EVENTS subscription — truncate must work without touching any queue.
2975        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2976        // No crash, no queue to check. Just verify truncation happened.
2977        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2978        assert!(rows.result.records.is_empty());
2979    }
2980
2981    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2982    /// `collection_dropped` event. The subscription is removed from the
2983    /// source contract but the target queue is preserved for consumer drain.
2984    #[test]
2985    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2986        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2987        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2988            .unwrap();
2989        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2990            .unwrap();
2991
2992        // Drain insert events so we start clean.
2993        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2994
2995        rt.execute_query("DROP TABLE users").unwrap();
2996
2997        // Queue must still exist with 1 collection_dropped event.
2998        let events = queue_payloads(&rt, "users_events");
2999        assert_eq!(
3000            events.len(),
3001            1,
3002            "expected 1 collection_dropped event, got {}",
3003            events.len()
3004        );
3005        let ev = events[0].as_object().expect("event is object");
3006        assert_eq!(
3007            ev.get("op").and_then(crate::json::Value::as_str),
3008            Some("collection_dropped")
3009        );
3010        assert_eq!(
3011            ev.get("collection").and_then(crate::json::Value::as_str),
3012            Some("users")
3013        );
3014        assert_eq!(
3015            ev.get("final_entities_count")
3016                .and_then(crate::json::Value::as_u64),
3017            Some(2)
3018        );
3019        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
3020        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
3021        assert!(ev
3022            .get("event_id")
3023            .and_then(crate::json::Value::as_str)
3024            .is_some_and(|s| !s.is_empty()));
3025
3026        // Source collection is gone.
3027        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
3028        assert!(
3029            format!("{err}").contains("users"),
3030            "expected not-found error"
3031        );
3032    }
3033
3034    /// `DROP TABLE users` on a collection without event subscription works
3035    /// normally with no event emitted.
3036    #[test]
3037    fn drop_no_events_collection_emits_nothing() {
3038        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3039        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
3040            .unwrap();
3041        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
3042            .unwrap();
3043        rt.execute_query("DROP TABLE plain").unwrap();
3044        // No crash and collection is gone.
3045        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
3046        assert!(format!("{err}").contains("plain"));
3047    }
3048
3049    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
3050
3051    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
3052    #[test]
3053    fn ops_filter_insert_only_ignores_update_and_delete() {
3054        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3055        rt.execute_query(
3056            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
3057        )
3058        .unwrap();
3059        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
3060            .unwrap();
3061        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
3062            .unwrap();
3063        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
3064
3065        let events = queue_payloads(&rt, "items_events");
3066        // Only the INSERT should have fired.
3067        assert_eq!(
3068            events.len(),
3069            1,
3070            "expected 1 insert event, got {}",
3071            events.len()
3072        );
3073        assert_eq!(
3074            events[0]
3075                .as_object()
3076                .unwrap()
3077                .get("op")
3078                .and_then(crate::json::Value::as_str),
3079            Some("insert")
3080        );
3081    }
3082
3083    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
3084    #[test]
3085    fn where_filter_skips_rows_that_do_not_match() {
3086        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3087        rt.execute_query(
3088            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
3089        )
3090        .unwrap();
3091
3092        // This row should generate an event.
3093        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
3094            .unwrap();
3095        // This row should NOT generate an event.
3096        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
3097            .unwrap();
3098
3099        let events = queue_payloads(&rt, "users_events");
3100        assert_eq!(
3101            events.len(),
3102            1,
3103            "expected 1 event (only active), got {}",
3104            events.len()
3105        );
3106        let ev = events[0].as_object().unwrap();
3107        assert_eq!(
3108            ev.get("op").and_then(crate::json::Value::as_str),
3109            Some("insert")
3110        );
3111        let after = ev.get("after").unwrap().as_object().unwrap();
3112        assert_eq!(
3113            after.get("status").and_then(crate::json::Value::as_str),
3114            Some("active")
3115        );
3116    }
3117
3118    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
3119    #[test]
3120    fn ops_filter_and_where_filter_combined() {
3121        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3122        rt.execute_query(
3123            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
3124        )
3125        .unwrap();
3126
3127        // INSERT active → event
3128        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
3129            .unwrap();
3130        // INSERT inactive → no event
3131        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
3132            .unwrap();
3133        // UPDATE row 1 to inactive → after = inactive, no event
3134        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
3135            .unwrap();
3136        // DELETE → ops_filter excludes it
3137        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3138
3139        let events = queue_payloads(&rt, "items_events");
3140        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
3141        assert_eq!(
3142            events.len(),
3143            1,
3144            "expected 1 event, got {}: {events:?}",
3145            events.len()
3146        );
3147        assert_eq!(
3148            events[0]
3149                .as_object()
3150                .unwrap()
3151                .get("op")
3152                .and_then(crate::json::Value::as_str),
3153            Some("insert")
3154        );
3155    }
3156
3157    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
3158    #[test]
3159    fn where_filter_on_delete_checks_before_state() {
3160        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3161        rt.execute_query(
3162            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
3163        )
3164        .unwrap();
3165
3166        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
3167            .unwrap();
3168
3169        // Delete active row → event (before-state was active)
3170        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3171        // Delete inactive row → no event (before-state was inactive)
3172        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
3173
3174        let events = queue_payloads(&rt, "users_events");
3175        assert_eq!(
3176            events.len(),
3177            1,
3178            "expected 1 delete event, got {}",
3179            events.len()
3180        );
3181        let ev = events[0].as_object().unwrap();
3182        assert_eq!(
3183            ev.get("op").and_then(crate::json::Value::as_str),
3184            Some("delete")
3185        );
3186    }
3187
3188    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
3189
3190    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
3191    #[test]
3192    fn alter_add_column_on_event_enabled_table_succeeds() {
3193        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3194        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
3195            .unwrap();
3196        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
3197        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
3198            .unwrap();
3199        // The column is now in the contract.
3200        let contract = rt.db().collection_contract("users").unwrap();
3201        assert!(
3202            contract.declared_columns.iter().any(|c| c.name == "phone"),
3203            "phone column should be in contract"
3204        );
3205        // Subscription still enabled after the alter.
3206        assert!(
3207            contract.subscriptions.iter().any(|s| s.enabled),
3208            "subscription should remain enabled"
3209        );
3210    }
3211
3212    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
3213    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
3214    #[test]
3215    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
3216        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3217        rt.execute_query(
3218            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
3219        )
3220        .unwrap();
3221        // DROP COLUMN — schema change event path exercises, must not error.
3222        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
3223            .unwrap();
3224        let contract = rt.db().collection_contract("items").unwrap();
3225        assert!(
3226            !contract.declared_columns.iter().any(|c| c.name == "secret"),
3227            "secret column should be removed"
3228        );
3229        // ENABLE RLS — non-column op, no schema-change event (coverage).
3230        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
3231            .unwrap();
3232        // Collection and subscription still intact.
3233        assert!(
3234            contract.subscriptions.iter().any(|s| s.enabled),
3235            "subscription should remain enabled"
3236        );
3237    }
3238
3239    /// Slice G (#675) — every newly-created `CollectionModel::Vector`
3240    /// collection is marked `vector.turbo` and gains a materialised
3241    /// `TurboCollectionState`. The marker is what the SEARCH/INSERT
3242    /// hot paths read to branch between TurboQuant and the legacy
3243    /// brute-force fallback.
3244    #[test]
3245    fn create_vector_marks_collection_as_turbo_baseline() {
3246        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3247        rt.execute_query("CREATE VECTOR embeddings DIM 4").unwrap();
3248        let store = rt.db().store();
3249        assert!(
3250            crate::runtime::vector_turbo_kind::is_turbo(&store, "embeddings"),
3251            "new vector collections must be turbo-marked baseline"
3252        );
3253        assert!(
3254            rt.db().turbo_state("embeddings").is_some(),
3255            "turbo_state must materialise after CREATE VECTOR"
3256        );
3257    }
3258
3259    /// Non-vector collections must NOT be turbo-marked. Guards against
3260    /// the always-baseline change accidentally leaking the marker onto
3261    /// CREATE TABLE / CREATE DOCUMENT / etc.
3262    #[test]
3263    fn create_table_does_not_mark_turbo() {
3264        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3265        rt.execute_query("CREATE TABLE plain (id INT)").unwrap();
3266        let store = rt.db().store();
3267        assert!(
3268            !crate::runtime::vector_turbo_kind::is_turbo(&store, "plain"),
3269            "non-vector collections must not gain the turbo marker"
3270        );
3271        assert!(rt.db().turbo_state("plain").is_none());
3272    }
3273
3274    /// `CREATE COLLECTION KIND vector.turbo` continues to mark the
3275    /// collection (idempotent with the new always-baseline path).
3276    #[test]
3277    fn create_collection_kind_vector_turbo_still_marked() {
3278        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3279        rt.execute_query("CREATE COLLECTION turbo_v KIND vector.turbo DIM 4")
3280            .unwrap();
3281        let store = rt.db().store();
3282        assert!(crate::runtime::vector_turbo_kind::is_turbo(
3283            &store, "turbo_v"
3284        ));
3285        assert!(rt.db().turbo_state("turbo_v").is_some());
3286    }
3287
3288    /// Issue #1271: a declared AI policy is persisted in the catalog and
3289    /// reads back identical via the collection contract (introspection).
3290    #[test]
3291    fn create_table_persists_and_introspects_ai_policy() {
3292        use crate::catalog::{ModerateDegradedMode, ModerateRejectAction};
3293        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3294        rt.execute_query(
3295            "CREATE TABLE posts (id INT, title TEXT, body TEXT, photo TEXT) WITH ( \
3296               EMBED (fields = ('title', 'body'), provider = 'openai', model = 'text-embedding-3-small'), \
3297               MODERATE (fields = ('body'), provider = 'openai', model = 'omni-moderation-latest', sync = true, degraded = closed, on_reject = flag), \
3298               VISION (image_field = 'photo', outputs = ('caption'), provider = 'openai', model = 'gpt-4o') \
3299             )",
3300        )
3301        .expect("create table with ai policy");
3302
3303        let contracts = rt.db().collection_contracts();
3304        let contract = contracts
3305            .iter()
3306            .find(|c| c.name == "posts")
3307            .expect("contract persisted");
3308        let policy = contract.ai_policy.as_ref().expect("ai policy persisted");
3309
3310        let embed = policy.embed.as_ref().expect("embed");
3311        assert_eq!(embed.fields, vec!["title".to_string(), "body".to_string()]);
3312        assert_eq!(embed.model, "text-embedding-3-small");
3313
3314        let moderate = policy.moderate.as_ref().expect("moderate");
3315        assert!(moderate.sync_gate);
3316        assert_eq!(moderate.degraded_mode, ModerateDegradedMode::Closed);
3317        assert_eq!(moderate.reject_action, ModerateRejectAction::Flag);
3318
3319        let vision = policy.vision.as_ref().expect("vision");
3320        assert_eq!(vision.image_field, "photo");
3321        assert_eq!(vision.model, "gpt-4o");
3322    }
3323
3324    /// Issue #1271 / #1269: a policy wiring a provider to a modality it
3325    /// cannot serve is rejected at DDL time, and the collection is not
3326    /// created. Anthropic has no embeddings product in the matrix.
3327    #[test]
3328    fn create_table_rejects_ai_policy_incapable_provider() {
3329        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3330        let err = rt
3331            .execute_query(
3332                "CREATE TABLE bad (id INT, body TEXT) WITH ( \
3333                   EMBED (fields = ('body'), provider = 'anthropic', model = 'claude-3-5-sonnet') \
3334                 )",
3335            )
3336            .unwrap_err();
3337        assert!(
3338            err.to_string()
3339                .contains("cannot serve the 'embed' modality"),
3340            "{err}"
3341        );
3342        assert!(
3343            rt.db().store().get_collection("bad").is_none(),
3344            "rejected DDL must not create the collection"
3345        );
3346    }
3347}