Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(&query.name, query.collection_model)
265        };
266        self.inner
267            .db
268            .save_collection_contract(contract)
269            .map_err(|err| RedDBError::Internal(err.to_string()))?;
270        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271            store.set_config_tree(
272                &format!("red.collection_tenants.{}", query.name),
273                &crate::serde_json::Value::String(tenant_id),
274            );
275        }
276        self.inner
277            .db
278            .persist_metadata()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.invalidate_result_cache();
281
282        Ok(RuntimeQueryResult::ok_message(
283            raw_query.to_string(),
284            &format!("{label} '{}' created", query.name),
285            "create",
286        ))
287    }
288
289    pub fn execute_create_collection(
290        &self,
291        raw_query: &str,
292        query: &CreateCollectionQuery,
293    ) -> RedDBResult<RuntimeQueryResult> {
294        let model = match query.kind.as_str() {
295            "graph" => CollectionModel::Graph,
296            "document" => CollectionModel::Document,
297            "metrics" => CollectionModel::Metrics,
298            "vector.turbo" => {
299                let dimension = query.vector_dimension.ok_or_else(|| {
300                    RedDBError::Query(
301                        "CREATE COLLECTION KIND vector.turbo requires DIM".to_string(),
302                    )
303                })?;
304                let create = CreateVectorQuery {
305                    name: query.name.clone(),
306                    dimension,
307                    metric: query
308                        .vector_metric
309                        .unwrap_or(crate::storage::engine::distance::DistanceMetric::Cosine),
310                    if_not_exists: query.if_not_exists,
311                };
312                let result = self.execute_create_vector(raw_query, &create)?;
313                // Issue #693 — durable kind marker that distinguishes
314                // this collection from the legacy `vector` path on
315                // every restart. Runtime state (TurboQuantIndex +
316                // TurboExtent) is lazily materialised by
317                // `RedDB::turbo_state` on first INSERT/SEARCH so the
318                // marker is the only thing that needs to survive.
319                let store = self.inner.db.store();
320                crate::runtime::vector_turbo_kind::mark_as_turbo(&store, &query.name);
321                self.inner
322                    .db
323                    .persist_metadata()
324                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
325                // Eagerly construct the state so the TurboExtent (if
326                // any) is reserved at creation time rather than on
327                // first INSERT. Errors are swallowed: the lazy path in
328                // `turbo_state` will retry on the next access.
329                let _ = self.inner.db.turbo_state(&query.name);
330                return Ok(result);
331            }
332            // KIND blockchain — issue #523 foundation: stored on top of a
333            // Table-shaped collection. The `chain` marker + reserved-column
334            // discipline make the difference. Schema validation, conflict
335            // retries, and `verify_chain` come in later iterations.
336            "blockchain" => CollectionModel::Table,
337            other => {
338                return Err(RedDBError::Query(format!(
339                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
340                )));
341            }
342        };
343        let create = CreateTableQuery {
344            collection_model: model,
345            name: query.name.clone(),
346            columns: Vec::new(),
347            if_not_exists: query.if_not_exists,
348            default_ttl_ms: None,
349            metrics_rollup_policies: Vec::new(),
350            context_index_fields: Vec::new(),
351            context_index_enabled: false,
352            timestamps: false,
353            partition_by: None,
354            tenant_by: None,
355            append_only: false,
356            subscriptions: Vec::new(),
357            vault_own_master_key: false,
358        };
359        let result = self.execute_create_table(raw_query, &create)?;
360        if query.kind == "blockchain" {
361            self.install_blockchain_kind(&query.name)?;
362        }
363        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
364        // already produces a validated 32-byte pubkey list; installing
365        // the registry stamps the per-collection signer set into
366        // `red_config` so the INSERT path can load it cheaply.
367        if !query.allowed_signers.is_empty() {
368            let actor = crate::runtime::impl_core::current_user_projected()
369                .unwrap_or_else(|| "@system/create-collection".to_string());
370            crate::runtime::signed_writes_kind::install(
371                &self.inner.db.store(),
372                &query.name,
373                &query.allowed_signers,
374                &actor,
375            );
376        }
377        Ok(result)
378    }
379
380    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
381    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
382    /// has a row at height 0 we leave it.
383    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
384        use crate::runtime::blockchain_kind;
385        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
386        use std::sync::Arc;
387
388        let store = self.inner.db.store();
389        blockchain_kind::mark_as_chain(&store, name);
390
391        let existing_tip = blockchain_kind::chain_tip(&store, name);
392        if existing_tip.height.is_some() {
393            return Ok(());
394        }
395
396        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
397        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
398            fields.into_iter().collect();
399        let entity = UnifiedEntity::new(
400            EntityId::new(0),
401            EntityKind::TableRow {
402                table: Arc::from(name),
403                row_id: 0,
404            },
405            EntityData::Row(RowData {
406                columns: Vec::new(),
407                named: Some(named),
408                schema: None,
409            }),
410        );
411        store
412            .insert_auto(name, entity)
413            .map_err(|err| RedDBError::Internal(err.to_string()))?;
414        // #524: prime the in-memory tip cache so the chain-tip endpoint and
415        // subsequent INSERTs don't have to scan the collection to find genesis.
416        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
417            self.inner
418                .chain_tip_cache
419                .lock()
420                .insert(name.to_string(), tip);
421        }
422        Ok(())
423    }
424
425    pub fn execute_create_vector(
426        &self,
427        raw_query: &str,
428        query: &CreateVectorQuery,
429    ) -> RedDBResult<RuntimeQueryResult> {
430        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
431        if is_system_schema_name(&query.name) {
432            return Err(RedDBError::Query("system schema is read-only".to_string()));
433        }
434        let store = self.inner.db.store();
435        if store.get_collection(&query.name).is_some() {
436            if query.if_not_exists {
437                return Ok(RuntimeQueryResult::ok_message(
438                    raw_query.to_string(),
439                    &format!("vector '{}' already exists", query.name),
440                    "create",
441                ));
442            }
443            return Err(RedDBError::Query(format!(
444                "vector '{}' already exists",
445                query.name
446            )));
447        }
448
449        store
450            .create_collection(&query.name)
451            .map_err(|err| RedDBError::Internal(err.to_string()))?;
452        self.inner
453            .db
454            .save_collection_contract(vector_collection_contract(query))
455            .map_err(|err| RedDBError::Internal(err.to_string()))?;
456        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
457            store.set_config_tree(
458                &format!("red.collection_tenants.{}", query.name),
459                &crate::serde_json::Value::String(tenant_id),
460            );
461        }
462        self.inner
463            .db
464            .persist_metadata()
465            .map_err(|err| RedDBError::Internal(err.to_string()))?;
466        self.invalidate_result_cache();
467
468        Ok(RuntimeQueryResult::ok_message(
469            raw_query.to_string(),
470            &format!("vector '{}' created", query.name),
471            "create",
472        ))
473    }
474
475    fn provision_vault_key_material(
476        &self,
477        collection: &str,
478        own_master_key: bool,
479    ) -> RedDBResult<()> {
480        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
481            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
482        })?;
483        if !auth_store.is_vault_backed() {
484            return Err(RedDBError::Query(
485                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
486            ));
487        }
488
489        if auth_store.vault_secret_key().is_none() {
490            let key = crate::auth::store::random_bytes(32);
491            auth_store
492                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
493                .map_err(|err| RedDBError::Query(err.to_string()))?;
494        }
495
496        if own_master_key {
497            let key = crate::auth::store::random_bytes(32);
498            auth_store
499                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
500                .map_err(|err| RedDBError::Query(err.to_string()))?;
501        }
502
503        Ok(())
504    }
505
506    /// Execute DROP TABLE
507    ///
508    /// Drops the collection and all its data from the store.
509    pub fn execute_drop_table(
510        &self,
511        raw_query: &str,
512        query: &DropTableQuery,
513    ) -> RedDBResult<RuntimeQueryResult> {
514        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
515        let store = self.inner.db.store();
516
517        if is_system_schema_name(&query.name) {
518            return Err(RedDBError::Query("system schema is read-only".to_string()));
519        }
520
521        let exists = store.get_collection(&query.name).is_some();
522        if !exists {
523            if query.if_exists {
524                return Ok(RuntimeQueryResult::ok_message(
525                    raw_query.to_string(),
526                    &format!("table '{}' does not exist", query.name),
527                    "drop",
528                ));
529            }
530            return Err(RedDBError::NotFound(format!(
531                "table '{}' not found",
532                query.name
533            )));
534        }
535        let actual =
536            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
537        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
538
539        // Emit 1 collection_dropped event before storage is wiped.
540        // Queue is preserved; subscription is removed with the contract below.
541        let final_count = store
542            .get_collection(&query.name)
543            .map(|manager| manager.query_all(|_| true).len() as u64)
544            .unwrap_or(0);
545        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
546            self,
547            &query.name,
548            final_count,
549        )?;
550
551        let orphaned_indices: Vec<String> = self
552            .inner
553            .index_store
554            .list_indices(&query.name)
555            .into_iter()
556            .map(|index| index.name)
557            .collect();
558        for name in &orphaned_indices {
559            self.inner.index_store.drop_index(name, &query.name);
560        }
561
562        store
563            .drop_collection(&query.name)
564            .map_err(|err| RedDBError::Internal(err.to_string()))?;
565        self.inner.db.invalidate_vector_index(&query.name);
566        self.inner.db.clear_collection_default_ttl_ms(&query.name);
567        self.inner
568            .db
569            .remove_collection_contract(&query.name)
570            .map_err(|err| RedDBError::Internal(err.to_string()))?;
571        self.clear_table_planner_stats(&query.name);
572        self.invalidate_result_cache();
573        // Issue #119: a dropped collection vanishes from every
574        // (tenant, role)'s visible-collections set. Auth is optional in
575        // embedded mode so guard the call.
576        if let Some(store) = self.inner.auth_store.read().clone() {
577            store.invalidate_visible_collections_cache();
578        }
579        self.inner
580            .db
581            .persist_metadata()
582            .map_err(|err| RedDBError::Internal(err.to_string()))?;
583        // Issue #120 — drop both the collection entries *and* every
584        // index entry that was scoped to this collection.  Dropping the
585        // collection wipes columns + collection-name + type-tags +
586        // index hits in one pass via `purge_collection_entries`, so
587        // the explicit `DropIndex` calls would be redundant.
588        self.schema_vocabulary_apply(
589            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
590                collection: query.name.clone(),
591            },
592        );
593
594        Ok(RuntimeQueryResult::ok_message(
595            raw_query.to_string(),
596            &format!("table '{}' dropped", query.name),
597            "drop",
598        ))
599    }
600
601    pub fn execute_drop_graph(
602        &self,
603        raw_query: &str,
604        query: &DropGraphQuery,
605    ) -> RedDBResult<RuntimeQueryResult> {
606        self.execute_drop_typed_collection(
607            raw_query,
608            &query.name,
609            query.if_exists,
610            CollectionModel::Graph,
611            "graph",
612        )
613    }
614
615    pub fn execute_drop_vector(
616        &self,
617        raw_query: &str,
618        query: &DropVectorQuery,
619    ) -> RedDBResult<RuntimeQueryResult> {
620        self.execute_drop_typed_collection(
621            raw_query,
622            &query.name,
623            query.if_exists,
624            CollectionModel::Vector,
625            "vector",
626        )
627    }
628
629    pub fn execute_drop_document(
630        &self,
631        raw_query: &str,
632        query: &DropDocumentQuery,
633    ) -> RedDBResult<RuntimeQueryResult> {
634        self.execute_drop_typed_collection(
635            raw_query,
636            &query.name,
637            query.if_exists,
638            CollectionModel::Document,
639            "document",
640        )
641    }
642
643    pub fn execute_drop_kv(
644        &self,
645        raw_query: &str,
646        query: &DropKvQuery,
647    ) -> RedDBResult<RuntimeQueryResult> {
648        let label = polymorphic_resolver::model_name(query.model);
649        self.execute_drop_typed_collection(
650            raw_query,
651            &query.name,
652            query.if_exists,
653            query.model,
654            label,
655        )
656    }
657
658    pub fn execute_drop_collection(
659        &self,
660        raw_query: &str,
661        query: &DropCollectionQuery,
662    ) -> RedDBResult<RuntimeQueryResult> {
663        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
664        if is_system_schema_name(&query.name) {
665            return Err(RedDBError::Query("system schema is read-only".to_string()));
666        }
667        let store = self.inner.db.store();
668        if store.get_collection(&query.name).is_none() {
669            if query.if_exists {
670                return Ok(RuntimeQueryResult::ok_message(
671                    raw_query.to_string(),
672                    &format!("collection '{}' does not exist", query.name),
673                    "drop",
674                ));
675            }
676            return Err(RedDBError::NotFound(format!(
677                "collection '{}' not found",
678                query.name
679            )));
680        }
681
682        let actual =
683            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
684        if let Some(expected) = query.model {
685            polymorphic_resolver::ensure_model_match(expected, actual)?;
686        }
687
688        match actual {
689            CollectionModel::Table => self.execute_drop_table(
690                raw_query,
691                &DropTableQuery {
692                    name: query.name.clone(),
693                    if_exists: query.if_exists,
694                },
695            ),
696            CollectionModel::TimeSeries => self.execute_drop_timeseries(
697                raw_query,
698                &DropTimeSeriesQuery {
699                    name: query.name.clone(),
700                    if_exists: query.if_exists,
701                },
702            ),
703            CollectionModel::Queue => self.execute_drop_queue(
704                raw_query,
705                &DropQueueQuery {
706                    name: query.name.clone(),
707                    if_exists: query.if_exists,
708                },
709            ),
710            CollectionModel::Graph => self.execute_drop_graph(
711                raw_query,
712                &DropGraphQuery {
713                    name: query.name.clone(),
714                    if_exists: query.if_exists,
715                },
716            ),
717            CollectionModel::Vector => self.execute_drop_vector(
718                raw_query,
719                &DropVectorQuery {
720                    name: query.name.clone(),
721                    if_exists: query.if_exists,
722                },
723            ),
724            CollectionModel::Document => self.execute_drop_document(
725                raw_query,
726                &DropDocumentQuery {
727                    name: query.name.clone(),
728                    if_exists: query.if_exists,
729                },
730            ),
731            CollectionModel::Kv => self.execute_drop_kv(
732                raw_query,
733                &DropKvQuery {
734                    name: query.name.clone(),
735                    if_exists: query.if_exists,
736                    model: CollectionModel::Kv,
737                },
738            ),
739            CollectionModel::Config => self.execute_drop_kv(
740                raw_query,
741                &DropKvQuery {
742                    name: query.name.clone(),
743                    if_exists: query.if_exists,
744                    model: CollectionModel::Config,
745                },
746            ),
747            CollectionModel::Vault => self.execute_drop_kv(
748                raw_query,
749                &DropKvQuery {
750                    name: query.name.clone(),
751                    if_exists: query.if_exists,
752                    model: CollectionModel::Vault,
753                },
754            ),
755            CollectionModel::Hll => self.execute_probabilistic_command(
756                raw_query,
757                &ProbabilisticCommand::DropHll {
758                    name: query.name.clone(),
759                    if_exists: query.if_exists,
760                },
761            ),
762            CollectionModel::Sketch => self.execute_probabilistic_command(
763                raw_query,
764                &ProbabilisticCommand::DropSketch {
765                    name: query.name.clone(),
766                    if_exists: query.if_exists,
767                },
768            ),
769            CollectionModel::Filter => self.execute_probabilistic_command(
770                raw_query,
771                &ProbabilisticCommand::DropFilter {
772                    name: query.name.clone(),
773                    if_exists: query.if_exists,
774                },
775            ),
776            CollectionModel::Metrics => self.execute_drop_typed_collection(
777                raw_query,
778                &query.name,
779                query.if_exists,
780                CollectionModel::Metrics,
781                "metrics",
782            ),
783            CollectionModel::Mixed => self.execute_drop_typed_collection(
784                raw_query,
785                &query.name,
786                query.if_exists,
787                CollectionModel::Mixed,
788                "collection",
789            ),
790        }
791    }
792
793    /// Execute ALTER TABLE
794    ///
795    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
796    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
797    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
798    pub fn execute_alter_table(
799        &self,
800        raw_query: &str,
801        query: &AlterTableQuery,
802    ) -> RedDBResult<RuntimeQueryResult> {
803        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
804        let store = self.inner.db.store();
805
806        // Verify the table exists.
807        if store.get_collection(&query.name).is_none() {
808            return Err(RedDBError::NotFound(format!(
809                "table '{}' not found",
810                query.name
811            )));
812        }
813
814        let mut messages = Vec::new();
815
816        // Collect column-level changes upfront for schema-change event emission below.
817        let fields_added: Vec<String> = query
818            .operations
819            .iter()
820            .filter_map(|op| {
821                if let AlterOperation::AddColumn(col) = op {
822                    Some(col.name.clone())
823                } else {
824                    None
825                }
826            })
827            .collect();
828        let fields_removed: Vec<String> = query
829            .operations
830            .iter()
831            .filter_map(|op| {
832                if let AlterOperation::DropColumn(name) = op {
833                    Some(name.clone())
834                } else {
835                    None
836                }
837            })
838            .collect();
839
840        for op in &query.operations {
841            match op {
842                AlterOperation::AddColumn(col) => {
843                    // Schema-on-read: column will be available on next insert.
844                    messages.push(format!("column '{}' added", col.name));
845                }
846                AlterOperation::DropColumn(name) => {
847                    messages.push(format!("column '{}' dropped", name));
848                }
849                AlterOperation::RenameColumn { from, to } => {
850                    messages.push(format!("column '{}' renamed to '{}'", from, to));
851                }
852                AlterOperation::AttachPartition { child, bound } => {
853                    // Persist child → parent binding in red_config so the
854                    // future planner-side pruner can enumerate children and
855                    // evaluate their bounds.
856                    store.set_config_tree(
857                        &format!("partition.{}.children.{}", query.name, child),
858                        &crate::serde_json::Value::String(bound.clone()),
859                    );
860                    messages.push(format!(
861                        "partition '{child}' attached to '{}' ({bound})",
862                        query.name
863                    ));
864                }
865                AlterOperation::DetachPartition { child } => {
866                    store.set_config_tree(
867                        &format!("partition.{}.children.{}", query.name, child),
868                        &crate::serde_json::Value::Null,
869                    );
870                    messages.push(format!(
871                        "partition '{child}' detached from '{}'",
872                        query.name
873                    ));
874                }
875                AlterOperation::EnableRowLevelSecurity => {
876                    self.inner
877                        .rls_enabled_tables
878                        .write()
879                        .insert(query.name.clone());
880                    // Persist flag so RLS survives restart via red_config.
881                    store.set_config_tree(
882                        &format!("rls.enabled.{}", query.name),
883                        &crate::serde_json::Value::Bool(true),
884                    );
885                    self.invalidate_plan_cache();
886                    messages.push(format!("row level security enabled on '{}'", query.name));
887                }
888                AlterOperation::DisableRowLevelSecurity => {
889                    self.inner.rls_enabled_tables.write().remove(&query.name);
890                    store.set_config_tree(
891                        &format!("rls.enabled.{}", query.name),
892                        &crate::serde_json::Value::Null,
893                    );
894                    self.invalidate_plan_cache();
895                    messages.push(format!("row level security disabled on '{}'", query.name));
896                }
897                // Phase 2.5.4: retrofit tenancy onto an existing table.
898                AlterOperation::EnableTenancy { column } => {
899                    store.set_config_tree(
900                        &format!("tenant_tables.{}.column", query.name),
901                        &crate::serde_json::Value::String(column.clone()),
902                    );
903                    self.register_tenant_table(&query.name, column);
904                    self.invalidate_plan_cache();
905                    messages.push(format!(
906                        "tenancy enabled on '{}' by column '{column}'",
907                        query.name
908                    ));
909                }
910                AlterOperation::DisableTenancy => {
911                    store.set_config_tree(
912                        &format!("tenant_tables.{}.column", query.name),
913                        &crate::serde_json::Value::Null,
914                    );
915                    self.unregister_tenant_table(&query.name);
916                    self.invalidate_plan_cache();
917                    messages.push(format!("tenancy disabled on '{}'", query.name));
918                }
919                AlterOperation::SetAppendOnly(on) => {
920                    // Contract is the single source of truth for the
921                    // UPDATE/DELETE parse-time guard. The flag lands
922                    // below via `apply_alter_operations_to_contract`;
923                    // here we only publish the human-readable message.
924                    messages.push(format!(
925                        "append_only {} on '{}'",
926                        if *on { "enabled" } else { "disabled" },
927                        query.name
928                    ));
929                }
930                AlterOperation::SetVersioned(on) => {
931                    // Opt the collection into (or out of) Git-for-Data.
932                    // Persists a row in red_vcs_settings; next AS OF /
933                    // merge / diff against this table honours the
934                    // flag. Retroactive: existing row versions whose
935                    // xmins are still pinned by commits become
936                    // reachable via AS OF COMMIT immediately.
937                    self.vcs_set_versioned(&query.name, *on)?;
938                    messages.push(format!(
939                        "versioned {} on '{}'",
940                        if *on { "enabled" } else { "disabled" },
941                        query.name
942                    ));
943                }
944                AlterOperation::EnableEvents(subscription) => {
945                    let mut subscription = subscription.clone();
946                    subscription.source = query.name.clone();
947                    validate_event_subscriptions(
948                        self,
949                        &query.name,
950                        std::slice::from_ref(&subscription),
951                    )?;
952                    ensure_event_target_queue(self, &subscription.target_queue)?;
953                    messages.push(format!(
954                        "events enabled on '{}' to '{}'",
955                        query.name, subscription.target_queue
956                    ));
957                }
958                AlterOperation::DisableEvents => {
959                    messages.push(format!("events disabled on '{}'", query.name));
960                }
961                AlterOperation::AddSubscription { name, descriptor } => {
962                    let mut sub = descriptor.clone();
963                    sub.name = name.clone();
964                    sub.source = query.name.clone();
965                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
966                    ensure_event_target_queue(self, &sub.target_queue)?;
967                    messages.push(format!(
968                        "subscription '{}' added on '{}' to '{}'",
969                        name, query.name, sub.target_queue
970                    ));
971                }
972                AlterOperation::DropSubscription { name } => {
973                    messages.push(format!(
974                        "subscription '{}' dropped on '{}'",
975                        name, query.name
976                    ));
977                }
978                AlterOperation::AddSigner { pubkey } => {
979                    // Issue #522 — admin-gated registry mutation. The
980                    // standard DDL `check_write` above gates by role; we
981                    // additionally verify the collection actually has a
982                    // signed-writes registry installed so this isn't a
983                    // covert way to retrofit one (use `CREATE COLLECTION
984                    // ... SIGNED_BY (...)` for that).
985                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
986                        return Err(RedDBError::Query(format!(
987                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
988                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
989                            query.name
990                        )));
991                    }
992                    let actor = crate::runtime::impl_core::current_user_projected()
993                        .unwrap_or_else(|| "@system/alter".to_string());
994                    let changed = crate::runtime::signed_writes_kind::add_signer(
995                        &store,
996                        &query.name,
997                        *pubkey,
998                        &actor,
999                    );
1000                    messages.push(format!(
1001                        "signer {} on '{}'",
1002                        if changed { "added" } else { "already present" },
1003                        query.name
1004                    ));
1005                }
1006                AlterOperation::RevokeSigner { pubkey } => {
1007                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
1008                        return Err(RedDBError::Query(format!(
1009                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
1010                            query.name
1011                        )));
1012                    }
1013                    let actor = crate::runtime::impl_core::current_user_projected()
1014                        .unwrap_or_else(|| "@system/alter".to_string());
1015                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
1016                        &store,
1017                        &query.name,
1018                        pubkey,
1019                        &actor,
1020                    );
1021                    messages.push(format!(
1022                        "signer {} on '{}'",
1023                        if changed {
1024                            "revoked"
1025                        } else {
1026                            "already revoked"
1027                        },
1028                        query.name
1029                    ));
1030                }
1031                AlterOperation::SetRetention { duration_ms } => {
1032                    // Issue #580 — validate that the collection has a
1033                    // timestamp column the lazy-on-scan filter can key
1034                    // off. Without one there is no anchor for "older
1035                    // than now - duration"; reject at ALTER time so the
1036                    // policy can never silently hide all rows.
1037                    let existing = self.inner.db.collection_contract(&query.name);
1038                    let has_ts_column = existing
1039                        .as_ref()
1040                        .map(retention_timestamp_column_exists)
1041                        .unwrap_or(false);
1042                    if !has_ts_column {
1043                        return Err(RedDBError::Query(format!(
1044                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1045                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1046                             or enable WITH timestamps = true before setting a \
1047                             retention policy",
1048                            query.name
1049                        )));
1050                    }
1051                    messages.push(format!(
1052                        "retention set to {duration_ms} ms on '{}'",
1053                        query.name
1054                    ));
1055                }
1056                AlterOperation::UnsetRetention => {
1057                    messages.push(format!("retention cleared on '{}'", query.name));
1058                }
1059            }
1060        }
1061
1062        let mut contract = self
1063            .inner
1064            .db
1065            .collection_contract(&query.name)
1066            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1067        apply_alter_operations_to_contract(&mut contract, &query.operations);
1068        contract.version = contract.version.saturating_add(1);
1069        contract.updated_at_unix_ms = current_unix_ms();
1070        self.inner
1071            .db
1072            .save_collection_contract(contract)
1073            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1074        // Issue #301 — emit OperatorEvent when column schema changes on a
1075        // collection that has active event subscriptions, so operators know
1076        // downstream consumers may see a different payload shape.
1077        if !fields_added.is_empty() || !fields_removed.is_empty() {
1078            let sub_names: Vec<String> = self
1079                .inner
1080                .db
1081                .collection_contract(&query.name)
1082                .map(|c| {
1083                    c.subscriptions
1084                        .iter()
1085                        .filter(|s| s.enabled)
1086                        .map(|s| s.name.clone())
1087                        .collect()
1088                })
1089                .unwrap_or_default();
1090            if !sub_names.is_empty() {
1091                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1092                    collection: query.name.clone(),
1093                    subscription_names: sub_names.join(", "),
1094                    fields_added: fields_added.join(", "),
1095                    fields_removed: fields_removed.join(", "),
1096                    lsn: self.cdc_current_lsn(),
1097                }
1098                .emit_global();
1099            }
1100        }
1101
1102        self.clear_table_planner_stats(&query.name);
1103        self.invalidate_result_cache();
1104        // Issue #120 — refresh the schema-vocabulary entries from the
1105        // post-ALTER contract. Drop+recreate inside the index keeps
1106        // the invalidation guarantee complete (no stale columns from
1107        // before an ALTER ... DROP COLUMN).
1108        let post_alter_columns: Vec<String> = self
1109            .inner
1110            .db
1111            .collection_contract(&query.name)
1112            .map(|contract| {
1113                contract
1114                    .declared_columns
1115                    .iter()
1116                    .map(|col| col.name.clone())
1117                    .collect()
1118            })
1119            .unwrap_or_default();
1120        self.schema_vocabulary_apply(
1121            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1122                collection: query.name.clone(),
1123                columns: post_alter_columns,
1124                type_tags: Vec::new(),
1125                description: None,
1126            },
1127        );
1128
1129        let message = if messages.is_empty() {
1130            format!("table '{}' altered (no operations)", query.name)
1131        } else {
1132            format!("table '{}' altered: {}", query.name, messages.join(", "))
1133        };
1134
1135        Ok(RuntimeQueryResult::ok_message(
1136            raw_query.to_string(),
1137            &message,
1138            "alter",
1139        ))
1140    }
1141
1142    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1143    ///
1144    /// Pure read: computes the schema diff between the target table's
1145    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1146    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1147    /// JSON. Never mutates storage.
1148    pub fn execute_explain_alter(
1149        &self,
1150        raw_query: &str,
1151        query: &ExplainAlterQuery,
1152    ) -> RedDBResult<RuntimeQueryResult> {
1153        // Validate the target CREATE TABLE body so syntactically valid
1154        // but semantically broken targets (bad SQL types, duplicate
1155        // columns) are caught here rather than inside the diff engine.
1156        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1157
1158        let current_contract = self.inner.db.collection_contract(&query.target.name);
1159
1160        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1161            .as_ref()
1162            .map(|c| c.declared_columns.clone())
1163            .unwrap_or_default();
1164
1165        let diff = super::schema_diff::compute_column_diff(
1166            &query.target.name,
1167            &current_columns,
1168            &query.target.columns,
1169        );
1170
1171        let rendered = match query.format {
1172            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1173            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1174        };
1175
1176        let format_label = match query.format {
1177            ExplainFormat::Sql => "sql",
1178            ExplainFormat::Json => "json",
1179        };
1180
1181        let columns = vec![
1182            "table".to_string(),
1183            "format".to_string(),
1184            "diff".to_string(),
1185        ];
1186        let row = vec![
1187            ("table".to_string(), Value::text(query.target.name.clone())),
1188            ("format".to_string(), Value::text(format_label.to_string())),
1189            ("diff".to_string(), Value::text(rendered)),
1190        ];
1191
1192        Ok(RuntimeQueryResult::ok_records(
1193            raw_query.to_string(),
1194            columns,
1195            vec![row],
1196            "explain",
1197        ))
1198    }
1199
1200    /// Execute CREATE INDEX
1201    ///
1202    /// Registers a new index on a collection, builds it from existing data,
1203    /// and makes it available to the query executor for O(1) lookups.
1204    pub fn execute_create_index(
1205        &self,
1206        raw_query: &str,
1207        query: &CreateIndexQuery,
1208    ) -> RedDBResult<RuntimeQueryResult> {
1209        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1210        let store = self.inner.db.store();
1211
1212        // Verify the table exists
1213        let manager = store
1214            .get_collection(&query.table)
1215            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1216
1217        let method_kind = match query.method {
1218            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1219            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1220            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1221            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1222        };
1223
1224        // Extract fields from existing entities for indexing. Row
1225        // entities may arrive in either the "named" HashMap layout
1226        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1227        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1228        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1229        // leaves `named == None`). Prior to this commit the columnar
1230        // branch returned an empty field list, so `CREATE INDEX` built
1231        // a zero-entity index over HTTP-inserted data even though the
1232        // data was queryable via `SELECT`.
1233        let entities = manager.query_all(|_| true);
1234        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1235            entities
1236                .iter()
1237                .map(|e| {
1238                    let fields = match &e.data {
1239                        crate::storage::EntityData::Row(row) => {
1240                            if let Some(ref named) = row.named {
1241                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1242                            } else if let Some(ref schema) = row.schema {
1243                                // Columnar layout — pair each column
1244                                // with its positional name from the
1245                                // shared schema Arc.
1246                                schema
1247                                    .iter()
1248                                    .zip(row.columns.iter())
1249                                    .map(|(k, v)| (k.clone(), v.clone()))
1250                                    .collect()
1251                            } else {
1252                                Vec::new()
1253                            }
1254                        }
1255                        crate::storage::EntityData::Node(node) => node
1256                            .properties
1257                            .iter()
1258                            .map(|(k, v)| (k.clone(), v.clone()))
1259                            .collect(),
1260                        _ => Vec::new(),
1261                    };
1262                    (e.id, fields)
1263                })
1264                .collect();
1265
1266        // Build the index
1267        let indexed_count = self
1268            .inner
1269            .index_store
1270            .create_index(
1271                &query.name,
1272                &query.table,
1273                &query.columns,
1274                method_kind,
1275                query.unique,
1276                &entity_fields,
1277            )
1278            .map_err(RedDBError::Internal)?;
1279
1280        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1281            &query.table,
1282            &entity_fields,
1283        );
1284        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1285        self.invalidate_plan_cache();
1286
1287        // Register metadata
1288        self.inner
1289            .index_store
1290            .register(super::index_store::RegisteredIndex {
1291                name: query.name.clone(),
1292                collection: query.table.clone(),
1293                columns: query.columns.clone(),
1294                method: method_kind,
1295                unique: query.unique,
1296            });
1297        // Issue #120 — surface the index name + indexed columns in
1298        // the schema-vocabulary so AskPipeline (#121) can resolve
1299        // "the email index" back to its collection.
1300        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1301            collection: query.table.clone(),
1302            index: query.name.clone(),
1303            columns: query.columns.clone(),
1304        });
1305
1306        let method_str = format!("{}", query.method);
1307        let unique_str = if query.unique { "unique " } else { "" };
1308        let cols = query.columns.join(", ");
1309
1310        Ok(RuntimeQueryResult::ok_message(
1311            raw_query.to_string(),
1312            &format!(
1313                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1314                unique_str, query.name, query.table, cols, method_str, indexed_count
1315            ),
1316            "create",
1317        ))
1318    }
1319
1320    /// Execute DROP INDEX
1321    ///
1322    /// Removes an index from a collection.
1323    pub fn execute_drop_index(
1324        &self,
1325        raw_query: &str,
1326        query: &DropIndexQuery,
1327    ) -> RedDBResult<RuntimeQueryResult> {
1328        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1329        let store = self.inner.db.store();
1330
1331        // Verify the table exists
1332        if store.get_collection(&query.table).is_none() {
1333            if query.if_exists {
1334                return Ok(RuntimeQueryResult::ok_message(
1335                    raw_query.to_string(),
1336                    &format!("table '{}' does not exist", query.table),
1337                    "drop",
1338                ));
1339            }
1340            return Err(RedDBError::NotFound(format!(
1341                "table '{}' not found",
1342                query.table
1343            )));
1344        }
1345
1346        // Remove from IndexStore
1347        self.inner.index_store.drop_index(&query.name, &query.table);
1348        self.invalidate_plan_cache();
1349        // Issue #120 — keep the schema-vocabulary index entry in sync.
1350        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1351            collection: query.table.clone(),
1352            index: query.name.clone(),
1353        });
1354
1355        Ok(RuntimeQueryResult::ok_message(
1356            raw_query.to_string(),
1357            &format!("index '{}' dropped from '{}'", query.name, query.table),
1358            "drop",
1359        ))
1360    }
1361
1362    fn execute_drop_typed_collection(
1363        &self,
1364        raw_query: &str,
1365        name: &str,
1366        if_exists: bool,
1367        expected_model: CollectionModel,
1368        label: &str,
1369    ) -> RedDBResult<RuntimeQueryResult> {
1370        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1371        if is_system_schema_name(name) {
1372            return Err(RedDBError::Query("system schema is read-only".to_string()));
1373        }
1374        let store = self.inner.db.store();
1375        if store.get_collection(name).is_none() {
1376            if if_exists {
1377                return Ok(RuntimeQueryResult::ok_message(
1378                    raw_query.to_string(),
1379                    &format!("{label} '{name}' does not exist"),
1380                    "drop",
1381                ));
1382            }
1383            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1384        }
1385
1386        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1387        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1388        self.drop_collection_storage(raw_query, name, label)
1389    }
1390
1391    pub fn execute_truncate(
1392        &self,
1393        raw_query: &str,
1394        query: &TruncateQuery,
1395    ) -> RedDBResult<RuntimeQueryResult> {
1396        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1397        if is_system_schema_name(&query.name) {
1398            return Err(RedDBError::Query("system schema is read-only".to_string()));
1399        }
1400
1401        let label = query
1402            .model
1403            .map(polymorphic_resolver::model_name)
1404            .unwrap_or("collection");
1405        let store = self.inner.db.store();
1406        if store.get_collection(&query.name).is_none() {
1407            if query.if_exists {
1408                return Ok(RuntimeQueryResult::ok_message(
1409                    raw_query.to_string(),
1410                    &format!("{label} '{}' does not exist", query.name),
1411                    "truncate",
1412                ));
1413            }
1414            return Err(RedDBError::NotFound(format!(
1415                "{label} '{}' not found",
1416                query.name
1417            )));
1418        }
1419
1420        let actual =
1421            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1422        if let Some(expected) = query.model {
1423            polymorphic_resolver::ensure_model_match(expected, actual)?;
1424        }
1425
1426        if actual == CollectionModel::Queue {
1427            return self.execute_queue_command(
1428                raw_query,
1429                &QueueCommand::Purge {
1430                    queue: query.name.clone(),
1431                },
1432            );
1433        }
1434
1435        // Count before wiping so we can emit the aggregated truncate event.
1436        let affected = self.truncate_collection_entities(&query.name)?;
1437        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1438        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1439        self.inner.db.invalidate_vector_index(&query.name);
1440        self.clear_table_planner_stats(&query.name);
1441        self.invalidate_result_cache();
1442
1443        Ok(RuntimeQueryResult::ok_message(
1444            raw_query.to_string(),
1445            &format!(
1446                "{affected} entities truncated from {label} '{}'",
1447                query.name
1448            ),
1449            "truncate",
1450        ))
1451    }
1452
1453    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1454        let store = self.inner.db.store();
1455        let Some(manager) = store.get_collection(name) else {
1456            return Ok(0);
1457        };
1458        let entities = manager.query_all(|_| true);
1459        if entities.is_empty() {
1460            return Ok(0);
1461        }
1462
1463        for entity in &entities {
1464            let fields = entity_index_fields(&entity.data);
1465            self.inner
1466                .index_store
1467                .index_entity_delete(name, entity.id, &fields)
1468                .map_err(RedDBError::Internal)?;
1469        }
1470
1471        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1472        let deleted_ids = store
1473            .delete_batch(name, &ids)
1474            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1475        for id in &deleted_ids {
1476            store.context_index().remove_entity(*id);
1477        }
1478        Ok(deleted_ids.len() as u64)
1479    }
1480
1481    fn drop_collection_storage(
1482        &self,
1483        raw_query: &str,
1484        name: &str,
1485        label: &str,
1486    ) -> RedDBResult<RuntimeQueryResult> {
1487        let store = self.inner.db.store();
1488
1489        // Emit 1 collection_dropped event before storage is wiped.
1490        // Queue is preserved; subscription is removed with the contract below.
1491        let final_count = store
1492            .get_collection(name)
1493            .map(|manager| manager.query_all(|_| true).len() as u64)
1494            .unwrap_or(0);
1495        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1496            self,
1497            name,
1498            final_count,
1499        )?;
1500
1501        let orphaned_indices: Vec<String> = self
1502            .inner
1503            .index_store
1504            .list_indices(name)
1505            .into_iter()
1506            .map(|index| index.name)
1507            .collect();
1508        for index_name in &orphaned_indices {
1509            self.inner.index_store.drop_index(index_name, name);
1510        }
1511
1512        store
1513            .drop_collection(name)
1514            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1515        self.inner.db.invalidate_vector_index(name);
1516        self.inner.db.clear_collection_default_ttl_ms(name);
1517        self.inner
1518            .db
1519            .remove_collection_contract(name)
1520            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1521        self.clear_table_planner_stats(name);
1522        self.invalidate_result_cache();
1523        if let Some(store) = self.inner.auth_store.read().clone() {
1524            store.invalidate_visible_collections_cache();
1525        }
1526        self.inner
1527            .db
1528            .persist_metadata()
1529            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1530        self.schema_vocabulary_apply(
1531            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1532                collection: name.to_string(),
1533            },
1534        );
1535
1536        Ok(RuntimeQueryResult::ok_message(
1537            raw_query.to_string(),
1538            &format!("{label} '{name}' dropped"),
1539            "drop",
1540        ))
1541    }
1542}
1543
1544pub(crate) fn is_system_schema_name(name: &str) -> bool {
1545    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1546}
1547
1548fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1549    match data {
1550        EntityData::Row(row) => {
1551            if let Some(ref named) = row.named {
1552                named
1553                    .iter()
1554                    .map(|(key, value)| (key.clone(), value.clone()))
1555                    .collect()
1556            } else if let Some(ref schema) = row.schema {
1557                schema
1558                    .iter()
1559                    .zip(row.columns.iter())
1560                    .map(|(key, value)| (key.clone(), value.clone()))
1561                    .collect()
1562            } else {
1563                Vec::new()
1564            }
1565        }
1566        EntityData::Node(node) => node
1567            .properties
1568            .iter()
1569            .map(|(key, value)| (key.clone(), value.clone()))
1570            .collect(),
1571        _ => Vec::new(),
1572    }
1573}
1574
1575fn collection_contract_from_create_table(
1576    query: &CreateTableQuery,
1577) -> RedDBResult<crate::physical::CollectionContract> {
1578    let now = current_unix_ms();
1579    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1580        .columns
1581        .iter()
1582        .map(declared_column_contract_from_ddl)
1583        .collect();
1584    if query.timestamps {
1585        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1586        // columns that the write path populates from
1587        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1588        declared_columns.push(crate::physical::DeclaredColumnContract {
1589            name: "created_at".to_string(),
1590            data_type: "BIGINT".to_string(),
1591            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1592            not_null: true,
1593            default: None,
1594            compress: None,
1595            unique: false,
1596            primary_key: false,
1597            enum_variants: Vec::new(),
1598            array_element: None,
1599            decimal_precision: None,
1600        });
1601        declared_columns.push(crate::physical::DeclaredColumnContract {
1602            name: "updated_at".to_string(),
1603            data_type: "BIGINT".to_string(),
1604            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1605            not_null: true,
1606            default: None,
1607            compress: None,
1608            unique: false,
1609            primary_key: false,
1610            enum_variants: Vec::new(),
1611            array_element: None,
1612            decimal_precision: None,
1613        });
1614    }
1615    Ok(crate::physical::CollectionContract {
1616        name: query.name.clone(),
1617        declared_model: crate::catalog::CollectionModel::Table,
1618        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1619        origin: crate::physical::ContractOrigin::Explicit,
1620        version: 1,
1621        created_at_unix_ms: now,
1622        updated_at_unix_ms: now,
1623        default_ttl_ms: query.default_ttl_ms,
1624        vector_dimension: None,
1625        vector_metric: None,
1626        context_index_fields: query.context_index_fields.clone(),
1627        declared_columns,
1628        table_def: Some(build_table_def_from_create_table(query)?),
1629        timestamps_enabled: query.timestamps,
1630        context_index_enabled: query.context_index_enabled
1631            || !query.context_index_fields.is_empty(),
1632        metrics_raw_retention_ms: None,
1633        metrics_rollup_policies: Vec::new(),
1634        metrics_tenant_identity: None,
1635        metrics_namespace: None,
1636        append_only: query.append_only,
1637        subscriptions: query.subscriptions.clone(),
1638        session_key: None,
1639        session_gap_ms: None,
1640        retention_duration_ms: None,
1641    })
1642}
1643
1644fn default_collection_contract_for_existing_table(
1645    name: &str,
1646) -> crate::physical::CollectionContract {
1647    let now = current_unix_ms();
1648    crate::physical::CollectionContract {
1649        name: name.to_string(),
1650        declared_model: crate::catalog::CollectionModel::Table,
1651        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1652        origin: crate::physical::ContractOrigin::Explicit,
1653        version: 0,
1654        created_at_unix_ms: now,
1655        updated_at_unix_ms: now,
1656        default_ttl_ms: None,
1657        vector_dimension: None,
1658        vector_metric: None,
1659        context_index_fields: Vec::new(),
1660        declared_columns: Vec::new(),
1661        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1662        timestamps_enabled: false,
1663        context_index_enabled: false,
1664        metrics_raw_retention_ms: None,
1665        metrics_rollup_policies: Vec::new(),
1666        metrics_tenant_identity: None,
1667        metrics_namespace: None,
1668        append_only: false,
1669        subscriptions: Vec::new(),
1670        session_key: None,
1671        session_gap_ms: None,
1672        retention_duration_ms: None,
1673    }
1674}
1675
1676fn keyed_collection_contract(
1677    name: &str,
1678    model: crate::catalog::CollectionModel,
1679) -> crate::physical::CollectionContract {
1680    let now = current_unix_ms();
1681    crate::physical::CollectionContract {
1682        name: name.to_string(),
1683        declared_model: model,
1684        schema_mode: crate::catalog::SchemaMode::Dynamic,
1685        origin: crate::physical::ContractOrigin::Explicit,
1686        version: 1,
1687        created_at_unix_ms: now,
1688        updated_at_unix_ms: now,
1689        default_ttl_ms: None,
1690        vector_dimension: None,
1691        vector_metric: None,
1692        context_index_fields: Vec::new(),
1693        declared_columns: Vec::new(),
1694        table_def: None,
1695        timestamps_enabled: false,
1696        context_index_enabled: false,
1697        metrics_raw_retention_ms: None,
1698        metrics_rollup_policies: Vec::new(),
1699        metrics_tenant_identity: None,
1700        metrics_namespace: None,
1701        append_only: false,
1702        subscriptions: Vec::new(),
1703        session_key: None,
1704        session_gap_ms: None,
1705        retention_duration_ms: None,
1706    }
1707}
1708
1709fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1710    let now = current_unix_ms();
1711    crate::physical::CollectionContract {
1712        name: query.name.clone(),
1713        declared_model: crate::catalog::CollectionModel::Metrics,
1714        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1715        origin: crate::physical::ContractOrigin::Explicit,
1716        version: 1,
1717        created_at_unix_ms: now,
1718        updated_at_unix_ms: now,
1719        default_ttl_ms: query.default_ttl_ms,
1720        vector_dimension: None,
1721        vector_metric: None,
1722        context_index_fields: Vec::new(),
1723        declared_columns: Vec::new(),
1724        table_def: None,
1725        timestamps_enabled: false,
1726        context_index_enabled: false,
1727        metrics_raw_retention_ms: query.default_ttl_ms,
1728        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1729        metrics_tenant_identity: Some(
1730            query
1731                .tenant_by
1732                .clone()
1733                .unwrap_or_else(|| "current_tenant".to_string()),
1734        ),
1735        metrics_namespace: Some("default".to_string()),
1736        append_only: true,
1737        subscriptions: Vec::new(),
1738        session_key: None,
1739        session_gap_ms: None,
1740        retention_duration_ms: None,
1741    }
1742}
1743
1744fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1745    let now = current_unix_ms();
1746    crate::physical::CollectionContract {
1747        name: query.name.clone(),
1748        declared_model: crate::catalog::CollectionModel::Vector,
1749        schema_mode: crate::catalog::SchemaMode::Dynamic,
1750        origin: crate::physical::ContractOrigin::Explicit,
1751        version: 1,
1752        created_at_unix_ms: now,
1753        updated_at_unix_ms: now,
1754        default_ttl_ms: None,
1755        vector_dimension: Some(query.dimension),
1756        vector_metric: Some(query.metric),
1757        context_index_fields: Vec::new(),
1758        declared_columns: Vec::new(),
1759        table_def: None,
1760        timestamps_enabled: false,
1761        context_index_enabled: false,
1762        metrics_raw_retention_ms: None,
1763        metrics_rollup_policies: Vec::new(),
1764        metrics_tenant_identity: None,
1765        metrics_namespace: None,
1766        append_only: false,
1767        subscriptions: Vec::new(),
1768        session_key: None,
1769        session_gap_ms: None,
1770        retention_duration_ms: None,
1771    }
1772}
1773
1774fn declared_column_contract_from_ddl(
1775    column: &CreateColumnDef,
1776) -> crate::physical::DeclaredColumnContract {
1777    crate::physical::DeclaredColumnContract {
1778        name: column.name.clone(),
1779        data_type: column.data_type.clone(),
1780        sql_type: Some(column.sql_type.clone()),
1781        not_null: column.not_null,
1782        default: column.default.clone(),
1783        compress: column.compress,
1784        unique: column.unique,
1785        primary_key: column.primary_key,
1786        enum_variants: column.enum_variants.clone(),
1787        array_element: column.array_element.clone(),
1788        decimal_precision: column.decimal_precision,
1789    }
1790}
1791
1792fn apply_alter_operations_to_contract(
1793    contract: &mut crate::physical::CollectionContract,
1794    operations: &[AlterOperation],
1795) {
1796    if contract.table_def.is_none() {
1797        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1798    }
1799    for operation in operations {
1800        match operation {
1801            AlterOperation::AddColumn(column) => {
1802                if !contract
1803                    .declared_columns
1804                    .iter()
1805                    .any(|existing| existing.name == column.name)
1806                {
1807                    contract
1808                        .declared_columns
1809                        .push(declared_column_contract_from_ddl(column));
1810                }
1811                if let Some(table_def) = contract.table_def.as_mut() {
1812                    if table_def.get_column(&column.name).is_none() {
1813                        if let Ok(column_def) = column_def_from_ddl(column) {
1814                            if column.primary_key {
1815                                table_def.primary_key.push(column.name.clone());
1816                                table_def.constraints.push(
1817                                    crate::storage::schema::Constraint::new(
1818                                        format!("pk_{}", column.name),
1819                                        crate::storage::schema::ConstraintType::PrimaryKey,
1820                                    )
1821                                    .on_columns(vec![column.name.clone()]),
1822                                );
1823                            }
1824                            if column.unique {
1825                                table_def.constraints.push(
1826                                    crate::storage::schema::Constraint::new(
1827                                        format!("uniq_{}", column.name),
1828                                        crate::storage::schema::ConstraintType::Unique,
1829                                    )
1830                                    .on_columns(vec![column.name.clone()]),
1831                                );
1832                            }
1833                            if column.not_null {
1834                                table_def.constraints.push(
1835                                    crate::storage::schema::Constraint::new(
1836                                        format!("not_null_{}", column.name),
1837                                        crate::storage::schema::ConstraintType::NotNull,
1838                                    )
1839                                    .on_columns(vec![column.name.clone()]),
1840                                );
1841                            }
1842                            table_def.columns.push(column_def);
1843                        }
1844                    }
1845                }
1846            }
1847            AlterOperation::DropColumn(name) => {
1848                contract
1849                    .declared_columns
1850                    .retain(|column| column.name != *name);
1851                if let Some(table_def) = contract.table_def.as_mut() {
1852                    if let Some(index) = table_def.column_index(name) {
1853                        table_def.columns.remove(index);
1854                    }
1855                    table_def.primary_key.retain(|column| column != name);
1856                    table_def.constraints.retain(|constraint| {
1857                        !constraint.columns.iter().any(|column| column == name)
1858                    });
1859                    table_def
1860                        .indexes
1861                        .retain(|index| !index.columns.iter().any(|column| column == name));
1862                }
1863            }
1864            AlterOperation::RenameColumn { from, to } => {
1865                if contract
1866                    .declared_columns
1867                    .iter()
1868                    .any(|column| column.name == *to)
1869                {
1870                    continue;
1871                }
1872                if let Some(column) = contract
1873                    .declared_columns
1874                    .iter_mut()
1875                    .find(|column| column.name == *from)
1876                {
1877                    column.name = to.clone();
1878                }
1879                if let Some(table_def) = contract.table_def.as_mut() {
1880                    if let Some(column) = table_def
1881                        .columns
1882                        .iter_mut()
1883                        .find(|column| column.name == *from)
1884                    {
1885                        column.name = to.clone();
1886                    }
1887                    for primary_key in &mut table_def.primary_key {
1888                        if *primary_key == *from {
1889                            *primary_key = to.clone();
1890                        }
1891                    }
1892                    for constraint in &mut table_def.constraints {
1893                        for column in &mut constraint.columns {
1894                            if *column == *from {
1895                                *column = to.clone();
1896                            }
1897                        }
1898                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1899                            for column in ref_columns {
1900                                if *column == *from {
1901                                    *column = to.clone();
1902                                }
1903                            }
1904                        }
1905                    }
1906                    for index in &mut table_def.indexes {
1907                        for column in &mut index.columns {
1908                            if *column == *from {
1909                                *column = to.clone();
1910                            }
1911                        }
1912                    }
1913                }
1914            }
1915            // Partition ops don't touch the column contract — metadata is
1916            // persisted separately via `red_config.partition.*`.
1917            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1918            // RLS toggles don't touch the column contract — flag is persisted
1919            // separately via `red_config.rls.enabled.{table}` and enforced
1920            // through the in-memory `rls_enabled_tables` set.
1921            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1922            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1923            // and are enforced through `tenant_tables` + RLS auto-policy.
1924            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1925            AlterOperation::SetAppendOnly(on) => {
1926                contract.append_only = *on;
1927            }
1928            // VCS opt-in is persisted to red_vcs_settings by the
1929            // executor, not the contract — nothing to do here.
1930            AlterOperation::SetVersioned(_) => {}
1931            AlterOperation::EnableEvents(subscription) => {
1932                let mut subscription = subscription.clone();
1933                subscription.source = contract.name.clone();
1934                subscription.enabled = true;
1935                if let Some(existing) = contract
1936                    .subscriptions
1937                    .iter_mut()
1938                    .find(|existing| existing.target_queue == subscription.target_queue)
1939                {
1940                    *existing = subscription;
1941                } else {
1942                    contract.subscriptions.push(subscription);
1943                }
1944            }
1945            AlterOperation::DisableEvents => {
1946                for subscription in &mut contract.subscriptions {
1947                    subscription.enabled = false;
1948                }
1949            }
1950            AlterOperation::AddSubscription { name, descriptor } => {
1951                let mut sub = descriptor.clone();
1952                sub.name = name.clone();
1953                sub.source = contract.name.clone();
1954                sub.enabled = true;
1955                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1956                {
1957                    *existing = sub;
1958                } else {
1959                    contract.subscriptions.push(sub);
1960                }
1961            }
1962            AlterOperation::DropSubscription { name } => {
1963                contract.subscriptions.retain(|s| s.name != *name);
1964            }
1965            // Signer registry mutations live in `red_config` outside the
1966            // contract surface — the executor applied them directly via
1967            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
1968            // into the column-shaped contract.
1969            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1970            AlterOperation::SetRetention { duration_ms } => {
1971                contract.retention_duration_ms = Some(*duration_ms);
1972            }
1973            AlterOperation::UnsetRetention => {
1974                contract.retention_duration_ms = None;
1975            }
1976        }
1977    }
1978}
1979
1980/// Issue #580 — returns true if the contract carries at least one
1981/// column the retention filter can use as a timestamp anchor:
1982/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
1983/// or a user-declared column with a temporal data_type.
1984pub(crate) fn retention_timestamp_column_exists(
1985    contract: &crate::physical::CollectionContract,
1986) -> bool {
1987    if contract.timestamps_enabled {
1988        return true;
1989    }
1990    if matches!(
1991        contract.declared_model,
1992        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
1993    ) {
1994        // Time-series and metrics collections carry an intrinsic
1995        // timestamp axis on every row even without a declared column;
1996        // retention has a natural anchor.
1997        return true;
1998    }
1999    contract
2000        .declared_columns
2001        .iter()
2002        .any(|column| is_temporal_data_type(&column.data_type))
2003}
2004
2005fn is_temporal_data_type(data_type: &str) -> bool {
2006    let upper = data_type.to_ascii_uppercase();
2007    matches!(
2008        upper.as_str(),
2009        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
2010    )
2011}
2012
2013fn validate_event_subscriptions(
2014    runtime: &RedDBRuntime,
2015    source: &str,
2016    subscriptions: &[crate::catalog::SubscriptionDescriptor],
2017) -> RedDBResult<()> {
2018    for subscription in subscriptions
2019        .iter()
2020        .filter(|subscription| subscription.enabled)
2021    {
2022        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
2023            return Err(RedDBError::Query(
2024                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
2025            ));
2026        }
2027        validate_subscription_auth(runtime, source, subscription)?;
2028        if subscription.target_queue == source
2029            || subscription_would_create_cycle(
2030                &runtime.inner.db,
2031                source,
2032                &subscription.target_queue,
2033            )
2034        {
2035            return Err(RedDBError::Query(
2036                "subscription would create cycle".to_string(),
2037            ));
2038        }
2039        audit_subscription_redact_gap(runtime, source, subscription);
2040    }
2041    Ok(())
2042}
2043
2044fn validate_subscription_auth(
2045    runtime: &RedDBRuntime,
2046    source: &str,
2047    subscription: &crate::catalog::SubscriptionDescriptor,
2048) -> RedDBResult<()> {
2049    let auth_store = match runtime.inner.auth_store.read().clone() {
2050        Some(store) => store,
2051        None => return Ok(()),
2052    };
2053    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2054        Some(identity) => identity,
2055        None => return Ok(()),
2056    };
2057    let tenant = crate::runtime::impl_core::current_tenant();
2058    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2059
2060    if auth_store.iam_authorization_enabled() {
2061        let ctx = crate::auth::policies::EvalContext {
2062            principal_tenant: tenant.clone(),
2063            current_tenant: tenant.clone(),
2064            peer_ip: None,
2065            mfa_present: false,
2066            now_ms: crate::auth::now_ms(),
2067            principal_is_admin_role: role == crate::auth::Role::Admin,
2068            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
2069            principal_is_platform_scoped: principal.tenant.is_none(),
2070        };
2071        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2072        if let Some(t) = tenant.as_deref() {
2073            source_resource = source_resource.with_tenant(t.to_string());
2074        }
2075        if !auth_store.check_policy_authz_with_role(
2076            &principal,
2077            "select",
2078            &source_resource,
2079            &ctx,
2080            role,
2081        ) {
2082            return Err(RedDBError::Query(format!(
2083                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2084                principal, source_resource.kind, source_resource.name
2085            )));
2086        }
2087
2088        let mut target_resource =
2089            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2090        if let Some(t) = tenant.as_deref() {
2091            target_resource = target_resource.with_tenant(t.to_string());
2092        }
2093        if !auth_store.check_policy_authz_with_role(
2094            &principal,
2095            "write",
2096            &target_resource,
2097            &ctx,
2098            role,
2099        ) {
2100            return Err(RedDBError::Query(format!(
2101                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2102                principal, target_resource.kind, target_resource.name
2103            )));
2104        }
2105        return Ok(());
2106    }
2107
2108    let ctx = crate::auth::privileges::AuthzContext {
2109        principal: &username,
2110        effective_role: role,
2111        tenant: tenant.as_deref(),
2112    };
2113    auth_store
2114        .check_grant(
2115            &ctx,
2116            crate::auth::privileges::Action::Select,
2117            &crate::auth::privileges::Resource::table_from_name(source),
2118        )
2119        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2120    auth_store
2121        .check_grant(
2122            &ctx,
2123            crate::auth::privileges::Action::Insert,
2124            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2125        )
2126        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2127    Ok(())
2128}
2129
2130fn audit_subscription_redact_gap(
2131    runtime: &RedDBRuntime,
2132    source: &str,
2133    subscription: &crate::catalog::SubscriptionDescriptor,
2134) {
2135    let auth_store = match runtime.inner.auth_store.read().clone() {
2136        Some(store) if store.iam_authorization_enabled() => store,
2137        _ => return,
2138    };
2139    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2140        Some(identity) => identity,
2141        None => return,
2142    };
2143    let tenant = crate::runtime::impl_core::current_tenant();
2144    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2145    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2146    if missing.is_empty() {
2147        return;
2148    }
2149
2150    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2151    tracing::warn!(
2152        target: "reddb::operator",
2153        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2154        source,
2155        subscription.target_queue,
2156        columns
2157    );
2158    let mut event = AuditEvent::builder("subscription_redact_gap")
2159        .principal(username)
2160        .source(AuditAuthSource::System)
2161        .resource(format!(
2162            "subscription:{}->{}",
2163            source, subscription.target_queue
2164        ))
2165        .outcome(Outcome::Success)
2166        .field(AuditFieldEscaper::field("source", source))
2167        .field(AuditFieldEscaper::field(
2168            "target_queue",
2169            subscription.target_queue.clone(),
2170        ))
2171        .field(AuditFieldEscaper::field(
2172            "subscription",
2173            subscription.name.clone(),
2174        ))
2175        .field(AuditFieldEscaper::field("columns", columns))
2176        .field(AuditFieldEscaper::field("role", role.as_str()));
2177    if let Some(t) = tenant {
2178        event = event.tenant(t);
2179    }
2180    runtime.inner.audit_log.record_event(event.build());
2181}
2182
2183fn subscription_redact_gap_columns(
2184    auth_store: &crate::auth::store::AuthStore,
2185    principal: &crate::auth::UserId,
2186    source: &str,
2187    subscription: &crate::catalog::SubscriptionDescriptor,
2188) -> BTreeSet<String> {
2189    let redacted: HashSet<String> = subscription
2190        .redact_fields
2191        .iter()
2192        .map(|field| field.to_ascii_lowercase())
2193        .collect();
2194    auth_store
2195        .effective_policies(principal)
2196        .iter()
2197        .flat_map(|policy| policy.statements.iter())
2198        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2199        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2200        .flat_map(|statement| statement.resources.iter())
2201        .filter_map(|resource| denied_column_for_source(resource, source))
2202        .filter(|column| !redact_covers_column(&redacted, source, column))
2203        .collect()
2204}
2205
2206fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2207    match pattern {
2208        crate::auth::policies::ActionPattern::Wildcard => true,
2209        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2210        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2211            "select".len() > prefix.len() + 1
2212                && "select".starts_with(prefix)
2213                && "select".as_bytes()[prefix.len()] == b':'
2214        }
2215    }
2216}
2217
2218fn denied_column_for_source(
2219    resource: &crate::auth::policies::ResourcePattern,
2220    source: &str,
2221) -> Option<String> {
2222    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2223        return None;
2224    };
2225    if kind != "column" {
2226        return None;
2227    }
2228    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2229    (column.table_resource_name() == source).then_some(column.column)
2230}
2231
2232fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2233    let column = column.to_ascii_lowercase();
2234    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2235    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2236}
2237
2238fn subscription_would_create_cycle(
2239    db: &crate::storage::unified::devx::RedDB,
2240    source: &str,
2241    target: &str,
2242) -> bool {
2243    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2244    for contract in db.collection_contracts() {
2245        for subscription in contract
2246            .subscriptions
2247            .into_iter()
2248            .filter(|subscription| subscription.enabled)
2249        {
2250            graph
2251                .entry(subscription.source)
2252                .or_default()
2253                .push(subscription.target_queue);
2254        }
2255    }
2256    graph
2257        .entry(source.to_string())
2258        .or_default()
2259        .push(target.to_string());
2260
2261    let mut stack = vec![target.to_string()];
2262    let mut seen = HashSet::new();
2263    while let Some(node) = stack.pop() {
2264        if node == source {
2265            return true;
2266        }
2267        if !seen.insert(node.clone()) {
2268            continue;
2269        }
2270        if let Some(next) = graph.get(&node) {
2271            stack.extend(next.iter().cloned());
2272        }
2273    }
2274    false
2275}
2276
2277pub(crate) fn ensure_event_target_queue_pub(
2278    runtime: &RedDBRuntime,
2279    queue: &str,
2280) -> RedDBResult<()> {
2281    ensure_event_target_queue(runtime, queue)
2282}
2283
2284fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2285    let store = runtime.inner.db.store();
2286    if store.get_collection(queue).is_some() {
2287        return Ok(());
2288    }
2289    store
2290        .create_collection(queue)
2291        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2292    runtime
2293        .inner
2294        .db
2295        .save_collection_contract(event_queue_collection_contract(queue))
2296        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2297    store.set_config_tree(
2298        &format!("queue.{queue}.mode"),
2299        &crate::serde_json::Value::String("fanout".to_string()),
2300    );
2301    Ok(())
2302}
2303
2304fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2305    let now = current_unix_ms();
2306    crate::physical::CollectionContract {
2307        name: queue.to_string(),
2308        declared_model: crate::catalog::CollectionModel::Queue,
2309        schema_mode: crate::catalog::SchemaMode::Dynamic,
2310        origin: crate::physical::ContractOrigin::Implicit,
2311        version: 1,
2312        created_at_unix_ms: now,
2313        updated_at_unix_ms: now,
2314        default_ttl_ms: None,
2315        vector_dimension: None,
2316        vector_metric: None,
2317        context_index_fields: Vec::new(),
2318        declared_columns: Vec::new(),
2319        table_def: None,
2320        timestamps_enabled: false,
2321        context_index_enabled: false,
2322        metrics_raw_retention_ms: None,
2323        metrics_rollup_policies: Vec::new(),
2324        metrics_tenant_identity: None,
2325        metrics_namespace: None,
2326        append_only: true,
2327        subscriptions: Vec::new(),
2328        session_key: None,
2329        session_gap_ms: None,
2330        retention_duration_ms: None,
2331    }
2332}
2333
2334fn build_table_def_from_create_table(
2335    query: &CreateTableQuery,
2336) -> RedDBResult<crate::storage::schema::TableDef> {
2337    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2338    for column in &query.columns {
2339        if column.primary_key {
2340            table.primary_key.push(column.name.clone());
2341            table.constraints.push(
2342                crate::storage::schema::Constraint::new(
2343                    format!("pk_{}", column.name),
2344                    crate::storage::schema::ConstraintType::PrimaryKey,
2345                )
2346                .on_columns(vec![column.name.clone()]),
2347            );
2348        }
2349        if column.unique {
2350            table.constraints.push(
2351                crate::storage::schema::Constraint::new(
2352                    format!("uniq_{}", column.name),
2353                    crate::storage::schema::ConstraintType::Unique,
2354                )
2355                .on_columns(vec![column.name.clone()]),
2356            );
2357        }
2358        if column.not_null {
2359            table.constraints.push(
2360                crate::storage::schema::Constraint::new(
2361                    format!("not_null_{}", column.name),
2362                    crate::storage::schema::ConstraintType::NotNull,
2363                )
2364                .on_columns(vec![column.name.clone()]),
2365            );
2366        }
2367        table.columns.push(column_def_from_ddl(column)?);
2368    }
2369    // WITH timestamps = true: append the two runtime-managed columns
2370    // to the schema so resolved_contract_columns exposes them to the
2371    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2372    // not-nullable; the write path auto-fills them.
2373    if query.timestamps {
2374        table.columns.push(
2375            crate::storage::schema::ColumnDef::new(
2376                "created_at".to_string(),
2377                crate::storage::schema::DataType::UnsignedInteger,
2378            )
2379            .not_null(),
2380        );
2381        table.columns.push(
2382            crate::storage::schema::ColumnDef::new(
2383                "updated_at".to_string(),
2384                crate::storage::schema::DataType::UnsignedInteger,
2385            )
2386            .not_null(),
2387        );
2388        table.constraints.push(
2389            crate::storage::schema::Constraint::new(
2390                "not_null_created_at".to_string(),
2391                crate::storage::schema::ConstraintType::NotNull,
2392            )
2393            .on_columns(vec!["created_at".to_string()]),
2394        );
2395        table.constraints.push(
2396            crate::storage::schema::Constraint::new(
2397                "not_null_updated_at".to_string(),
2398                crate::storage::schema::ConstraintType::NotNull,
2399            )
2400            .on_columns(vec!["updated_at".to_string()]),
2401        );
2402    }
2403    table
2404        .validate()
2405        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2406    Ok(table)
2407}
2408
2409fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2410    let data_type = resolve_declared_data_type(&column.data_type)
2411        .map_err(|err| RedDBError::Query(err.to_string()))?;
2412    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2413    if column.not_null {
2414        column_def = column_def.not_null();
2415    }
2416    if let Some(default) = &column.default {
2417        column_def = column_def.with_default(default.as_bytes().to_vec());
2418    }
2419    if column.compress.unwrap_or(0) > 0 {
2420        column_def = column_def.compressed();
2421    }
2422    if !column.enum_variants.is_empty() {
2423        column_def = column_def.with_variants(column.enum_variants.clone());
2424    }
2425    if let Some(precision) = column.decimal_precision {
2426        column_def = column_def.with_precision(precision);
2427    }
2428    if let Some(element_type) = &column.array_element {
2429        column_def = column_def.with_element_type(
2430            resolve_declared_data_type(element_type)
2431                .map_err(|err| RedDBError::Query(err.to_string()))?,
2432        );
2433    }
2434    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2435    if column.unique {
2436        column_def = column_def.with_metadata("unique", "true");
2437    }
2438    if column.primary_key {
2439        column_def = column_def.with_metadata("primary_key", "true");
2440    }
2441    Ok(column_def)
2442}
2443
2444fn current_unix_ms() -> u128 {
2445    std::time::SystemTime::now()
2446        .duration_since(std::time::UNIX_EPOCH)
2447        .unwrap_or_default()
2448        .as_millis()
2449}
2450
2451#[cfg(test)]
2452mod tests {
2453    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2454    use crate::auth::store::{AuthStore, PrincipalRef};
2455    use crate::auth::UserId;
2456    use crate::auth::{AuthConfig, Role};
2457    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2458    use crate::storage::schema::Value;
2459    use crate::{RedDBOptions, RedDBRuntime};
2460    use std::sync::Arc;
2461
2462    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2463        Policy {
2464            id: id.to_string(),
2465            version: 1,
2466            tenant: None,
2467            created_at: 0,
2468            updated_at: 0,
2469            statements: vec![Statement {
2470                sid: None,
2471                effect: Effect::Allow,
2472                actions: vec![ActionPattern::Exact(action.to_string())],
2473                resources: vec![ResourcePattern::Exact {
2474                    kind: "collection".to_string(),
2475                    name: collection.to_string(),
2476                }],
2477                condition: None,
2478            }],
2479        }
2480    }
2481
2482    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2483        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2484        *rt.inner.auth_store.write() = Some(store.clone());
2485        store
2486    }
2487
2488    #[test]
2489    fn drop_denied_without_iam_policy() {
2490        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2491        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2492        let store = wire_auth_store(&rt);
2493        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2494        let select_only = Policy {
2495            id: "select-only".to_string(),
2496            version: 1,
2497            tenant: None,
2498            created_at: 0,
2499            updated_at: 0,
2500            statements: vec![Statement {
2501                sid: None,
2502                effect: Effect::Allow,
2503                actions: vec![ActionPattern::Exact("select".to_string())],
2504                resources: vec![ResourcePattern::Wildcard],
2505                condition: None,
2506            }],
2507        };
2508        store.put_policy_internal(select_only).unwrap();
2509        let alice = UserId::from_parts(None, "alice");
2510        store
2511            .attach_policy(PrincipalRef::User(alice), "select-only")
2512            .unwrap();
2513        set_current_auth_identity("alice".to_string(), Role::Write);
2514        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2515        clear_current_auth_identity();
2516        assert!(
2517            format!("{err}").contains("denied by IAM policy"),
2518            "got: {err}"
2519        );
2520    }
2521
2522    #[test]
2523    fn drop_allowed_with_explicit_iam_policy() {
2524        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2525        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2526        let store = wire_auth_store(&rt);
2527        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2528        store.put_policy_internal(policy).unwrap();
2529        let bob = UserId::from_parts(None, "bob");
2530        store
2531            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2532            .unwrap();
2533        set_current_auth_identity("bob".to_string(), Role::Write);
2534        rt.execute_query("DROP TABLE bar").unwrap();
2535        clear_current_auth_identity();
2536    }
2537
2538    #[test]
2539    fn drop_allowed_with_wildcard_iam_policy() {
2540        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2541        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2542        let store = wire_auth_store(&rt);
2543        let policy = Policy {
2544            id: "allow-drop-all".to_string(),
2545            version: 1,
2546            tenant: None,
2547            created_at: 0,
2548            updated_at: 0,
2549            statements: vec![Statement {
2550                sid: None,
2551                effect: Effect::Allow,
2552                actions: vec![ActionPattern::Exact("drop".to_string())],
2553                resources: vec![ResourcePattern::Wildcard],
2554                condition: None,
2555            }],
2556        };
2557        store.put_policy_internal(policy).unwrap();
2558        let carl = UserId::from_parts(None, "carl");
2559        store
2560            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2561            .unwrap();
2562        set_current_auth_identity("carl".to_string(), Role::Write);
2563        rt.execute_query("DROP TABLE baz").unwrap();
2564        clear_current_auth_identity();
2565    }
2566
2567    #[test]
2568    fn truncate_denied_without_iam_policy() {
2569        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2570        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2571        let store = wire_auth_store(&rt);
2572        // Acceptance #2 (#712 / S5A): in `policy_only` mode a
2573        // principal with no matching policy is denied even if their
2574        // role would have permitted the action. The pre-#712 default
2575        // of "no policy → deny" only holds under `policy_only`, so
2576        // pin the mode explicitly here so the assertion holds
2577        // regardless of the construction-time default.
2578        store
2579            .set_enforcement_mode(crate::auth::enforcement_mode::PolicyEnforcementMode::PolicyOnly);
2580        // A policy exists (IAM active) but gives no truncate right.
2581        let select_only = Policy {
2582            id: "select-only-2".to_string(),
2583            version: 1,
2584            tenant: None,
2585            created_at: 0,
2586            updated_at: 0,
2587            statements: vec![Statement {
2588                sid: None,
2589                effect: Effect::Allow,
2590                actions: vec![ActionPattern::Exact("select".to_string())],
2591                resources: vec![ResourcePattern::Wildcard],
2592                condition: None,
2593            }],
2594        };
2595        store.put_policy_internal(select_only).unwrap();
2596        let dana = UserId::from_parts(None, "dana");
2597        store
2598            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2599            .unwrap();
2600        set_current_auth_identity("dana".to_string(), Role::Write);
2601        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2602        clear_current_auth_identity();
2603        assert!(
2604            format!("{err}").contains("denied by IAM policy"),
2605            "got: {err}"
2606        );
2607    }
2608
2609    #[test]
2610    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2611        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2612        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2613            .unwrap();
2614        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2615            .unwrap();
2616        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2617            .unwrap();
2618
2619        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2620        assert_eq!(truncated.statement_type, "truncate");
2621        assert_eq!(truncated.affected_rows, 0);
2622
2623        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2624        assert!(empty.result.records.is_empty());
2625
2626        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2627            .unwrap();
2628        let selected = rt
2629            .execute_query("SELECT name FROM users WHERE id = 3")
2630            .unwrap();
2631        let name = selected.result.records[0].get("name").unwrap();
2632        assert_eq!(name, &Value::text("cy"));
2633        assert!(rt.db().collection_contract("users").is_some());
2634        assert!(rt
2635            .inner
2636            .index_store
2637            .list_indices("users")
2638            .iter()
2639            .any(|index| index.name == "idx_users_id"));
2640    }
2641
2642    #[test]
2643    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2644        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2645        rt.execute_query("CREATE QUEUE tasks").unwrap();
2646        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2647
2648        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2649        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2650
2651        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2652        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2653        assert_eq!(
2654            len.result.records[0].get("len"),
2655            Some(&Value::UnsignedInteger(0))
2656        );
2657    }
2658
2659    #[test]
2660    fn truncate_system_schema_is_read_only() {
2661        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2662        let err = rt
2663            .execute_query("TRUNCATE COLLECTION red.collections")
2664            .unwrap_err();
2665        assert!(format!("{err}").contains("system schema is read-only"));
2666    }
2667
2668    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2669
2670    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2671        let result = rt
2672            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2673            .expect("peek queue");
2674        result
2675            .result
2676            .records
2677            .iter()
2678            .map(
2679                |record| match record.get("payload").expect("payload column") {
2680                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2681                    other => panic!("expected JSON queue payload, got {other:?}"),
2682                },
2683            )
2684            .collect()
2685    }
2686
2687    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2688    /// `truncate` event, not one delete event per row.
2689    #[test]
2690    fn truncate_event_enabled_table_emits_single_truncate_event() {
2691        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2692        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2693            .unwrap();
2694        rt.execute_query(
2695            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2696        )
2697        .unwrap();
2698
2699        // Drain the 3 insert events so we start clean.
2700        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2701
2702        rt.execute_query("TRUNCATE TABLE users").unwrap();
2703
2704        let events = queue_payloads(&rt, "users_events");
2705        // Must be exactly 1 truncate event, not 3 delete events.
2706        assert_eq!(
2707            events.len(),
2708            1,
2709            "expected 1 truncate event, got {}",
2710            events.len()
2711        );
2712        let ev = events[0].as_object().expect("event is object");
2713        assert_eq!(
2714            ev.get("op").and_then(crate::json::Value::as_str),
2715            Some("truncate")
2716        );
2717        assert_eq!(
2718            ev.get("collection").and_then(crate::json::Value::as_str),
2719            Some("users")
2720        );
2721        assert_eq!(
2722            ev.get("entities_count")
2723                .and_then(crate::json::Value::as_u64),
2724            Some(3)
2725        );
2726        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2727        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2728        assert!(ev
2729            .get("event_id")
2730            .and_then(crate::json::Value::as_str)
2731            .is_some_and(|s| !s.is_empty()));
2732    }
2733
2734    /// `TRUNCATE users` on a collection without event subscription emits no events.
2735    #[test]
2736    fn truncate_no_events_collection_emits_nothing() {
2737        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2738        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2739            .unwrap();
2740        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2741            .unwrap();
2742        // No EVENTS subscription — truncate must work without touching any queue.
2743        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2744        // No crash, no queue to check. Just verify truncation happened.
2745        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2746        assert!(rows.result.records.is_empty());
2747    }
2748
2749    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2750    /// `collection_dropped` event. The subscription is removed from the
2751    /// source contract but the target queue is preserved for consumer drain.
2752    #[test]
2753    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2754        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2755        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2756            .unwrap();
2757        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2758            .unwrap();
2759
2760        // Drain insert events so we start clean.
2761        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2762
2763        rt.execute_query("DROP TABLE users").unwrap();
2764
2765        // Queue must still exist with 1 collection_dropped event.
2766        let events = queue_payloads(&rt, "users_events");
2767        assert_eq!(
2768            events.len(),
2769            1,
2770            "expected 1 collection_dropped event, got {}",
2771            events.len()
2772        );
2773        let ev = events[0].as_object().expect("event is object");
2774        assert_eq!(
2775            ev.get("op").and_then(crate::json::Value::as_str),
2776            Some("collection_dropped")
2777        );
2778        assert_eq!(
2779            ev.get("collection").and_then(crate::json::Value::as_str),
2780            Some("users")
2781        );
2782        assert_eq!(
2783            ev.get("final_entities_count")
2784                .and_then(crate::json::Value::as_u64),
2785            Some(2)
2786        );
2787        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2788        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2789        assert!(ev
2790            .get("event_id")
2791            .and_then(crate::json::Value::as_str)
2792            .is_some_and(|s| !s.is_empty()));
2793
2794        // Source collection is gone.
2795        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2796        assert!(
2797            format!("{err}").contains("users"),
2798            "expected not-found error"
2799        );
2800    }
2801
2802    /// `DROP TABLE users` on a collection without event subscription works
2803    /// normally with no event emitted.
2804    #[test]
2805    fn drop_no_events_collection_emits_nothing() {
2806        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2807        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2808            .unwrap();
2809        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2810            .unwrap();
2811        rt.execute_query("DROP TABLE plain").unwrap();
2812        // No crash and collection is gone.
2813        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2814        assert!(format!("{err}").contains("plain"));
2815    }
2816
2817    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2818
2819    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2820    #[test]
2821    fn ops_filter_insert_only_ignores_update_and_delete() {
2822        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2823        rt.execute_query(
2824            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2825        )
2826        .unwrap();
2827        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2828            .unwrap();
2829        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2830            .unwrap();
2831        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2832
2833        let events = queue_payloads(&rt, "items_events");
2834        // Only the INSERT should have fired.
2835        assert_eq!(
2836            events.len(),
2837            1,
2838            "expected 1 insert event, got {}",
2839            events.len()
2840        );
2841        assert_eq!(
2842            events[0]
2843                .as_object()
2844                .unwrap()
2845                .get("op")
2846                .and_then(crate::json::Value::as_str),
2847            Some("insert")
2848        );
2849    }
2850
2851    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2852    #[test]
2853    fn where_filter_skips_rows_that_do_not_match() {
2854        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2855        rt.execute_query(
2856            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2857        )
2858        .unwrap();
2859
2860        // This row should generate an event.
2861        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2862            .unwrap();
2863        // This row should NOT generate an event.
2864        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2865            .unwrap();
2866
2867        let events = queue_payloads(&rt, "users_events");
2868        assert_eq!(
2869            events.len(),
2870            1,
2871            "expected 1 event (only active), got {}",
2872            events.len()
2873        );
2874        let ev = events[0].as_object().unwrap();
2875        assert_eq!(
2876            ev.get("op").and_then(crate::json::Value::as_str),
2877            Some("insert")
2878        );
2879        let after = ev.get("after").unwrap().as_object().unwrap();
2880        assert_eq!(
2881            after.get("status").and_then(crate::json::Value::as_str),
2882            Some("active")
2883        );
2884    }
2885
2886    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2887    #[test]
2888    fn ops_filter_and_where_filter_combined() {
2889        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2890        rt.execute_query(
2891            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2892        )
2893        .unwrap();
2894
2895        // INSERT active → event
2896        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2897            .unwrap();
2898        // INSERT inactive → no event
2899        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2900            .unwrap();
2901        // UPDATE row 1 to inactive → after = inactive, no event
2902        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2903            .unwrap();
2904        // DELETE → ops_filter excludes it
2905        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2906
2907        let events = queue_payloads(&rt, "items_events");
2908        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2909        assert_eq!(
2910            events.len(),
2911            1,
2912            "expected 1 event, got {}: {events:?}",
2913            events.len()
2914        );
2915        assert_eq!(
2916            events[0]
2917                .as_object()
2918                .unwrap()
2919                .get("op")
2920                .and_then(crate::json::Value::as_str),
2921            Some("insert")
2922        );
2923    }
2924
2925    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2926    #[test]
2927    fn where_filter_on_delete_checks_before_state() {
2928        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929        rt.execute_query(
2930            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2931        )
2932        .unwrap();
2933
2934        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2935            .unwrap();
2936
2937        // Delete active row → event (before-state was active)
2938        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2939        // Delete inactive row → no event (before-state was inactive)
2940        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2941
2942        let events = queue_payloads(&rt, "users_events");
2943        assert_eq!(
2944            events.len(),
2945            1,
2946            "expected 1 delete event, got {}",
2947            events.len()
2948        );
2949        let ev = events[0].as_object().unwrap();
2950        assert_eq!(
2951            ev.get("op").and_then(crate::json::Value::as_str),
2952            Some("delete")
2953        );
2954    }
2955
2956    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2957
2958    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2959    #[test]
2960    fn alter_add_column_on_event_enabled_table_succeeds() {
2961        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2962        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2963            .unwrap();
2964        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2965        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2966            .unwrap();
2967        // The column is now in the contract.
2968        let contract = rt.db().collection_contract("users").unwrap();
2969        assert!(
2970            contract.declared_columns.iter().any(|c| c.name == "phone"),
2971            "phone column should be in contract"
2972        );
2973        // Subscription still enabled after the alter.
2974        assert!(
2975            contract.subscriptions.iter().any(|s| s.enabled),
2976            "subscription should remain enabled"
2977        );
2978    }
2979
2980    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2981    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2982    #[test]
2983    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2984        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2985        rt.execute_query(
2986            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2987        )
2988        .unwrap();
2989        // DROP COLUMN — schema change event path exercises, must not error.
2990        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2991            .unwrap();
2992        let contract = rt.db().collection_contract("items").unwrap();
2993        assert!(
2994            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2995            "secret column should be removed"
2996        );
2997        // ENABLE RLS — non-column op, no schema-change event (coverage).
2998        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2999            .unwrap();
3000        // Collection and subscription still intact.
3001        assert!(
3002            contract.subscriptions.iter().any(|s| s.enabled),
3003            "subscription should remain enabled"
3004        );
3005    }
3006}