Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(&query.name, query.collection_model)
265        };
266        self.inner
267            .db
268            .save_collection_contract(contract)
269            .map_err(|err| RedDBError::Internal(err.to_string()))?;
270        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271            store.set_config_tree(
272                &format!("red.collection_tenants.{}", query.name),
273                &crate::serde_json::Value::String(tenant_id),
274            );
275        }
276        self.inner
277            .db
278            .persist_metadata()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.invalidate_result_cache();
281
282        Ok(RuntimeQueryResult::ok_message(
283            raw_query.to_string(),
284            &format!("{label} '{}' created", query.name),
285            "create",
286        ))
287    }
288
289    pub fn execute_create_collection(
290        &self,
291        raw_query: &str,
292        query: &CreateCollectionQuery,
293    ) -> RedDBResult<RuntimeQueryResult> {
294        let model = match query.kind.as_str() {
295            "graph" => CollectionModel::Graph,
296            "document" => CollectionModel::Document,
297            "metrics" => CollectionModel::Metrics,
298            // KIND blockchain — issue #523 foundation: stored on top of a
299            // Table-shaped collection. The `chain` marker + reserved-column
300            // discipline make the difference. Schema validation, conflict
301            // retries, and `verify_chain` come in later iterations.
302            "blockchain" => CollectionModel::Table,
303            other => {
304                return Err(RedDBError::Query(format!(
305                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
306                )));
307            }
308        };
309        let create = CreateTableQuery {
310            collection_model: model,
311            name: query.name.clone(),
312            columns: Vec::new(),
313            if_not_exists: query.if_not_exists,
314            default_ttl_ms: None,
315            metrics_rollup_policies: Vec::new(),
316            context_index_fields: Vec::new(),
317            context_index_enabled: false,
318            timestamps: false,
319            partition_by: None,
320            tenant_by: None,
321            append_only: false,
322            subscriptions: Vec::new(),
323            vault_own_master_key: false,
324        };
325        let result = self.execute_create_table(raw_query, &create)?;
326        if query.kind == "blockchain" {
327            self.install_blockchain_kind(&query.name)?;
328        }
329        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
330        // already produces a validated 32-byte pubkey list; installing
331        // the registry stamps the per-collection signer set into
332        // `red_config` so the INSERT path can load it cheaply.
333        if !query.allowed_signers.is_empty() {
334            let actor = crate::runtime::impl_core::current_user_projected()
335                .unwrap_or_else(|| "@system/create-collection".to_string());
336            crate::runtime::signed_writes_kind::install(
337                &*self.inner.db.store(),
338                &query.name,
339                &query.allowed_signers,
340                &actor,
341            );
342        }
343        Ok(result)
344    }
345
346    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
347    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
348    /// has a row at height 0 we leave it.
349    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
350        use crate::runtime::blockchain_kind;
351        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
352        use std::sync::Arc;
353
354        let store = self.inner.db.store();
355        blockchain_kind::mark_as_chain(&*store, name);
356
357        let existing_tip = blockchain_kind::chain_tip(&*store, name);
358        if existing_tip.height.is_some() {
359            return Ok(());
360        }
361
362        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
363        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
364            fields.into_iter().collect();
365        let entity = UnifiedEntity::new(
366            EntityId::new(0),
367            EntityKind::TableRow {
368                table: Arc::from(name),
369                row_id: 0,
370            },
371            EntityData::Row(RowData {
372                columns: Vec::new(),
373                named: Some(named),
374                schema: None,
375            }),
376        );
377        store
378            .insert_auto(name, entity)
379            .map_err(|err| RedDBError::Internal(err.to_string()))?;
380        // #524: prime the in-memory tip cache so the chain-tip endpoint and
381        // subsequent INSERTs don't have to scan the collection to find genesis.
382        if let Some(tip) = blockchain_kind::chain_tip_full(&*store, name) {
383            self.inner
384                .chain_tip_cache
385                .lock()
386                .insert(name.to_string(), tip);
387        }
388        Ok(())
389    }
390
391    pub fn execute_create_vector(
392        &self,
393        raw_query: &str,
394        query: &CreateVectorQuery,
395    ) -> RedDBResult<RuntimeQueryResult> {
396        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
397        if is_system_schema_name(&query.name) {
398            return Err(RedDBError::Query("system schema is read-only".to_string()));
399        }
400        let store = self.inner.db.store();
401        if store.get_collection(&query.name).is_some() {
402            if query.if_not_exists {
403                return Ok(RuntimeQueryResult::ok_message(
404                    raw_query.to_string(),
405                    &format!("vector '{}' already exists", query.name),
406                    "create",
407                ));
408            }
409            return Err(RedDBError::Query(format!(
410                "vector '{}' already exists",
411                query.name
412            )));
413        }
414
415        store
416            .create_collection(&query.name)
417            .map_err(|err| RedDBError::Internal(err.to_string()))?;
418        self.inner
419            .db
420            .save_collection_contract(vector_collection_contract(query))
421            .map_err(|err| RedDBError::Internal(err.to_string()))?;
422        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
423            store.set_config_tree(
424                &format!("red.collection_tenants.{}", query.name),
425                &crate::serde_json::Value::String(tenant_id),
426            );
427        }
428        self.inner
429            .db
430            .persist_metadata()
431            .map_err(|err| RedDBError::Internal(err.to_string()))?;
432        self.invalidate_result_cache();
433
434        Ok(RuntimeQueryResult::ok_message(
435            raw_query.to_string(),
436            &format!("vector '{}' created", query.name),
437            "create",
438        ))
439    }
440
441    fn provision_vault_key_material(
442        &self,
443        collection: &str,
444        own_master_key: bool,
445    ) -> RedDBResult<()> {
446        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
447            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
448        })?;
449        if !auth_store.is_vault_backed() {
450            return Err(RedDBError::Query(
451                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
452            ));
453        }
454
455        if auth_store.vault_secret_key().is_none() {
456            let key = crate::auth::store::random_bytes(32);
457            auth_store
458                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
459                .map_err(|err| RedDBError::Query(err.to_string()))?;
460        }
461
462        if own_master_key {
463            let key = crate::auth::store::random_bytes(32);
464            auth_store
465                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
466                .map_err(|err| RedDBError::Query(err.to_string()))?;
467        }
468
469        Ok(())
470    }
471
472    /// Execute DROP TABLE
473    ///
474    /// Drops the collection and all its data from the store.
475    pub fn execute_drop_table(
476        &self,
477        raw_query: &str,
478        query: &DropTableQuery,
479    ) -> RedDBResult<RuntimeQueryResult> {
480        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
481        let store = self.inner.db.store();
482
483        if is_system_schema_name(&query.name) {
484            return Err(RedDBError::Query("system schema is read-only".to_string()));
485        }
486
487        let exists = store.get_collection(&query.name).is_some();
488        if !exists {
489            if query.if_exists {
490                return Ok(RuntimeQueryResult::ok_message(
491                    raw_query.to_string(),
492                    &format!("table '{}' does not exist", query.name),
493                    "drop",
494                ));
495            }
496            return Err(RedDBError::NotFound(format!(
497                "table '{}' not found",
498                query.name
499            )));
500        }
501        let actual =
502            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
503        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
504
505        // Emit 1 collection_dropped event before storage is wiped.
506        // Queue is preserved; subscription is removed with the contract below.
507        let final_count = store
508            .get_collection(&query.name)
509            .map(|manager| manager.query_all(|_| true).len() as u64)
510            .unwrap_or(0);
511        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
512            self,
513            &query.name,
514            final_count,
515        )?;
516
517        let orphaned_indices: Vec<String> = self
518            .inner
519            .index_store
520            .list_indices(&query.name)
521            .into_iter()
522            .map(|index| index.name)
523            .collect();
524        for name in &orphaned_indices {
525            self.inner.index_store.drop_index(name, &query.name);
526        }
527
528        store
529            .drop_collection(&query.name)
530            .map_err(|err| RedDBError::Internal(err.to_string()))?;
531        self.inner.db.invalidate_vector_index(&query.name);
532        self.inner.db.clear_collection_default_ttl_ms(&query.name);
533        self.inner
534            .db
535            .remove_collection_contract(&query.name)
536            .map_err(|err| RedDBError::Internal(err.to_string()))?;
537        self.clear_table_planner_stats(&query.name);
538        self.invalidate_result_cache();
539        // Issue #119: a dropped collection vanishes from every
540        // (tenant, role)'s visible-collections set. Auth is optional in
541        // embedded mode so guard the call.
542        if let Some(store) = self.inner.auth_store.read().clone() {
543            store.invalidate_visible_collections_cache();
544        }
545        self.inner
546            .db
547            .persist_metadata()
548            .map_err(|err| RedDBError::Internal(err.to_string()))?;
549        // Issue #120 — drop both the collection entries *and* every
550        // index entry that was scoped to this collection.  Dropping the
551        // collection wipes columns + collection-name + type-tags +
552        // index hits in one pass via `purge_collection_entries`, so
553        // the explicit `DropIndex` calls would be redundant.
554        self.schema_vocabulary_apply(
555            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
556                collection: query.name.clone(),
557            },
558        );
559
560        Ok(RuntimeQueryResult::ok_message(
561            raw_query.to_string(),
562            &format!("table '{}' dropped", query.name),
563            "drop",
564        ))
565    }
566
567    pub fn execute_drop_graph(
568        &self,
569        raw_query: &str,
570        query: &DropGraphQuery,
571    ) -> RedDBResult<RuntimeQueryResult> {
572        self.execute_drop_typed_collection(
573            raw_query,
574            &query.name,
575            query.if_exists,
576            CollectionModel::Graph,
577            "graph",
578        )
579    }
580
581    pub fn execute_drop_vector(
582        &self,
583        raw_query: &str,
584        query: &DropVectorQuery,
585    ) -> RedDBResult<RuntimeQueryResult> {
586        self.execute_drop_typed_collection(
587            raw_query,
588            &query.name,
589            query.if_exists,
590            CollectionModel::Vector,
591            "vector",
592        )
593    }
594
595    pub fn execute_drop_document(
596        &self,
597        raw_query: &str,
598        query: &DropDocumentQuery,
599    ) -> RedDBResult<RuntimeQueryResult> {
600        self.execute_drop_typed_collection(
601            raw_query,
602            &query.name,
603            query.if_exists,
604            CollectionModel::Document,
605            "document",
606        )
607    }
608
609    pub fn execute_drop_kv(
610        &self,
611        raw_query: &str,
612        query: &DropKvQuery,
613    ) -> RedDBResult<RuntimeQueryResult> {
614        let label = polymorphic_resolver::model_name(query.model);
615        self.execute_drop_typed_collection(
616            raw_query,
617            &query.name,
618            query.if_exists,
619            query.model,
620            label,
621        )
622    }
623
624    pub fn execute_drop_collection(
625        &self,
626        raw_query: &str,
627        query: &DropCollectionQuery,
628    ) -> RedDBResult<RuntimeQueryResult> {
629        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
630        if is_system_schema_name(&query.name) {
631            return Err(RedDBError::Query("system schema is read-only".to_string()));
632        }
633        let store = self.inner.db.store();
634        if store.get_collection(&query.name).is_none() {
635            if query.if_exists {
636                return Ok(RuntimeQueryResult::ok_message(
637                    raw_query.to_string(),
638                    &format!("collection '{}' does not exist", query.name),
639                    "drop",
640                ));
641            }
642            return Err(RedDBError::NotFound(format!(
643                "collection '{}' not found",
644                query.name
645            )));
646        }
647
648        let actual =
649            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
650        if let Some(expected) = query.model {
651            polymorphic_resolver::ensure_model_match(expected, actual)?;
652        }
653
654        match actual {
655            CollectionModel::Table => self.execute_drop_table(
656                raw_query,
657                &DropTableQuery {
658                    name: query.name.clone(),
659                    if_exists: query.if_exists,
660                },
661            ),
662            CollectionModel::TimeSeries => self.execute_drop_timeseries(
663                raw_query,
664                &DropTimeSeriesQuery {
665                    name: query.name.clone(),
666                    if_exists: query.if_exists,
667                },
668            ),
669            CollectionModel::Queue => self.execute_drop_queue(
670                raw_query,
671                &DropQueueQuery {
672                    name: query.name.clone(),
673                    if_exists: query.if_exists,
674                },
675            ),
676            CollectionModel::Graph => self.execute_drop_graph(
677                raw_query,
678                &DropGraphQuery {
679                    name: query.name.clone(),
680                    if_exists: query.if_exists,
681                },
682            ),
683            CollectionModel::Vector => self.execute_drop_vector(
684                raw_query,
685                &DropVectorQuery {
686                    name: query.name.clone(),
687                    if_exists: query.if_exists,
688                },
689            ),
690            CollectionModel::Document => self.execute_drop_document(
691                raw_query,
692                &DropDocumentQuery {
693                    name: query.name.clone(),
694                    if_exists: query.if_exists,
695                },
696            ),
697            CollectionModel::Kv => self.execute_drop_kv(
698                raw_query,
699                &DropKvQuery {
700                    name: query.name.clone(),
701                    if_exists: query.if_exists,
702                    model: CollectionModel::Kv,
703                },
704            ),
705            CollectionModel::Config => self.execute_drop_kv(
706                raw_query,
707                &DropKvQuery {
708                    name: query.name.clone(),
709                    if_exists: query.if_exists,
710                    model: CollectionModel::Config,
711                },
712            ),
713            CollectionModel::Vault => self.execute_drop_kv(
714                raw_query,
715                &DropKvQuery {
716                    name: query.name.clone(),
717                    if_exists: query.if_exists,
718                    model: CollectionModel::Vault,
719                },
720            ),
721            CollectionModel::Hll => self.execute_probabilistic_command(
722                raw_query,
723                &ProbabilisticCommand::DropHll {
724                    name: query.name.clone(),
725                    if_exists: query.if_exists,
726                },
727            ),
728            CollectionModel::Sketch => self.execute_probabilistic_command(
729                raw_query,
730                &ProbabilisticCommand::DropSketch {
731                    name: query.name.clone(),
732                    if_exists: query.if_exists,
733                },
734            ),
735            CollectionModel::Filter => self.execute_probabilistic_command(
736                raw_query,
737                &ProbabilisticCommand::DropFilter {
738                    name: query.name.clone(),
739                    if_exists: query.if_exists,
740                },
741            ),
742            CollectionModel::Metrics => self.execute_drop_typed_collection(
743                raw_query,
744                &query.name,
745                query.if_exists,
746                CollectionModel::Metrics,
747                "metrics",
748            ),
749            CollectionModel::Mixed => self.execute_drop_typed_collection(
750                raw_query,
751                &query.name,
752                query.if_exists,
753                CollectionModel::Mixed,
754                "collection",
755            ),
756        }
757    }
758
759    /// Execute ALTER TABLE
760    ///
761    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
762    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
763    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
764    pub fn execute_alter_table(
765        &self,
766        raw_query: &str,
767        query: &AlterTableQuery,
768    ) -> RedDBResult<RuntimeQueryResult> {
769        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
770        let store = self.inner.db.store();
771
772        // Verify the table exists.
773        if store.get_collection(&query.name).is_none() {
774            return Err(RedDBError::NotFound(format!(
775                "table '{}' not found",
776                query.name
777            )));
778        }
779
780        let mut messages = Vec::new();
781
782        // Collect column-level changes upfront for schema-change event emission below.
783        let fields_added: Vec<String> = query
784            .operations
785            .iter()
786            .filter_map(|op| {
787                if let AlterOperation::AddColumn(col) = op {
788                    Some(col.name.clone())
789                } else {
790                    None
791                }
792            })
793            .collect();
794        let fields_removed: Vec<String> = query
795            .operations
796            .iter()
797            .filter_map(|op| {
798                if let AlterOperation::DropColumn(name) = op {
799                    Some(name.clone())
800                } else {
801                    None
802                }
803            })
804            .collect();
805
806        for op in &query.operations {
807            match op {
808                AlterOperation::AddColumn(col) => {
809                    // Schema-on-read: column will be available on next insert.
810                    messages.push(format!("column '{}' added", col.name));
811                }
812                AlterOperation::DropColumn(name) => {
813                    messages.push(format!("column '{}' dropped", name));
814                }
815                AlterOperation::RenameColumn { from, to } => {
816                    messages.push(format!("column '{}' renamed to '{}'", from, to));
817                }
818                AlterOperation::AttachPartition { child, bound } => {
819                    // Persist child → parent binding in red_config so the
820                    // future planner-side pruner can enumerate children and
821                    // evaluate their bounds.
822                    store.set_config_tree(
823                        &format!("partition.{}.children.{}", query.name, child),
824                        &crate::serde_json::Value::String(bound.clone()),
825                    );
826                    messages.push(format!(
827                        "partition '{child}' attached to '{}' ({bound})",
828                        query.name
829                    ));
830                }
831                AlterOperation::DetachPartition { child } => {
832                    store.set_config_tree(
833                        &format!("partition.{}.children.{}", query.name, child),
834                        &crate::serde_json::Value::Null,
835                    );
836                    messages.push(format!(
837                        "partition '{child}' detached from '{}'",
838                        query.name
839                    ));
840                }
841                AlterOperation::EnableRowLevelSecurity => {
842                    self.inner
843                        .rls_enabled_tables
844                        .write()
845                        .insert(query.name.clone());
846                    // Persist flag so RLS survives restart via red_config.
847                    store.set_config_tree(
848                        &format!("rls.enabled.{}", query.name),
849                        &crate::serde_json::Value::Bool(true),
850                    );
851                    self.invalidate_plan_cache();
852                    messages.push(format!("row level security enabled on '{}'", query.name));
853                }
854                AlterOperation::DisableRowLevelSecurity => {
855                    self.inner.rls_enabled_tables.write().remove(&query.name);
856                    store.set_config_tree(
857                        &format!("rls.enabled.{}", query.name),
858                        &crate::serde_json::Value::Null,
859                    );
860                    self.invalidate_plan_cache();
861                    messages.push(format!("row level security disabled on '{}'", query.name));
862                }
863                // Phase 2.5.4: retrofit tenancy onto an existing table.
864                AlterOperation::EnableTenancy { column } => {
865                    store.set_config_tree(
866                        &format!("tenant_tables.{}.column", query.name),
867                        &crate::serde_json::Value::String(column.clone()),
868                    );
869                    self.register_tenant_table(&query.name, column);
870                    self.invalidate_plan_cache();
871                    messages.push(format!(
872                        "tenancy enabled on '{}' by column '{column}'",
873                        query.name
874                    ));
875                }
876                AlterOperation::DisableTenancy => {
877                    store.set_config_tree(
878                        &format!("tenant_tables.{}.column", query.name),
879                        &crate::serde_json::Value::Null,
880                    );
881                    self.unregister_tenant_table(&query.name);
882                    self.invalidate_plan_cache();
883                    messages.push(format!("tenancy disabled on '{}'", query.name));
884                }
885                AlterOperation::SetAppendOnly(on) => {
886                    // Contract is the single source of truth for the
887                    // UPDATE/DELETE parse-time guard. The flag lands
888                    // below via `apply_alter_operations_to_contract`;
889                    // here we only publish the human-readable message.
890                    messages.push(format!(
891                        "append_only {} on '{}'",
892                        if *on { "enabled" } else { "disabled" },
893                        query.name
894                    ));
895                }
896                AlterOperation::SetVersioned(on) => {
897                    // Opt the collection into (or out of) Git-for-Data.
898                    // Persists a row in red_vcs_settings; next AS OF /
899                    // merge / diff against this table honours the
900                    // flag. Retroactive: existing row versions whose
901                    // xmins are still pinned by commits become
902                    // reachable via AS OF COMMIT immediately.
903                    self.vcs_set_versioned(&query.name, *on)?;
904                    messages.push(format!(
905                        "versioned {} on '{}'",
906                        if *on { "enabled" } else { "disabled" },
907                        query.name
908                    ));
909                }
910                AlterOperation::EnableEvents(subscription) => {
911                    let mut subscription = subscription.clone();
912                    subscription.source = query.name.clone();
913                    validate_event_subscriptions(
914                        self,
915                        &query.name,
916                        std::slice::from_ref(&subscription),
917                    )?;
918                    ensure_event_target_queue(self, &subscription.target_queue)?;
919                    messages.push(format!(
920                        "events enabled on '{}' to '{}'",
921                        query.name, subscription.target_queue
922                    ));
923                }
924                AlterOperation::DisableEvents => {
925                    messages.push(format!("events disabled on '{}'", query.name));
926                }
927                AlterOperation::AddSubscription { name, descriptor } => {
928                    let mut sub = descriptor.clone();
929                    sub.name = name.clone();
930                    sub.source = query.name.clone();
931                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
932                    ensure_event_target_queue(self, &sub.target_queue)?;
933                    messages.push(format!(
934                        "subscription '{}' added on '{}' to '{}'",
935                        name, query.name, sub.target_queue
936                    ));
937                }
938                AlterOperation::DropSubscription { name } => {
939                    messages.push(format!(
940                        "subscription '{}' dropped on '{}'",
941                        name, query.name
942                    ));
943                }
944                AlterOperation::AddSigner { pubkey } => {
945                    // Issue #522 — admin-gated registry mutation. The
946                    // standard DDL `check_write` above gates by role; we
947                    // additionally verify the collection actually has a
948                    // signed-writes registry installed so this isn't a
949                    // covert way to retrofit one (use `CREATE COLLECTION
950                    // ... SIGNED_BY (...)` for that).
951                    if !crate::runtime::signed_writes_kind::is_signed(&*store, &query.name)
952                    {
953                        return Err(RedDBError::Query(format!(
954                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
955                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
956                            query.name
957                        )));
958                    }
959                    let actor = crate::runtime::impl_core::current_user_projected()
960                        .unwrap_or_else(|| "@system/alter".to_string());
961                    let changed = crate::runtime::signed_writes_kind::add_signer(
962                        &*store, &query.name, *pubkey, &actor,
963                    );
964                    messages.push(format!(
965                        "signer {} on '{}'",
966                        if changed { "added" } else { "already present" },
967                        query.name
968                    ));
969                }
970                AlterOperation::RevokeSigner { pubkey } => {
971                    if !crate::runtime::signed_writes_kind::is_signed(&*store, &query.name)
972                    {
973                        return Err(RedDBError::Query(format!(
974                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
975                            query.name
976                        )));
977                    }
978                    let actor = crate::runtime::impl_core::current_user_projected()
979                        .unwrap_or_else(|| "@system/alter".to_string());
980                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
981                        &*store, &query.name, pubkey, &actor,
982                    );
983                    messages.push(format!(
984                        "signer {} on '{}'",
985                        if changed { "revoked" } else { "already revoked" },
986                        query.name
987                    ));
988                }
989                AlterOperation::SetRetention { duration_ms } => {
990                    // Issue #580 — validate that the collection has a
991                    // timestamp column the lazy-on-scan filter can key
992                    // off. Without one there is no anchor for "older
993                    // than now - duration"; reject at ALTER time so the
994                    // policy can never silently hide all rows.
995                    let existing = self.inner.db.collection_contract(&query.name);
996                    let has_ts_column = existing
997                        .as_ref()
998                        .map(retention_timestamp_column_exists)
999                        .unwrap_or(false);
1000                    if !has_ts_column {
1001                        return Err(RedDBError::Query(format!(
1002                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1003                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1004                             or enable WITH timestamps = true before setting a \
1005                             retention policy",
1006                            query.name
1007                        )));
1008                    }
1009                    messages.push(format!(
1010                        "retention set to {duration_ms} ms on '{}'",
1011                        query.name
1012                    ));
1013                }
1014                AlterOperation::UnsetRetention => {
1015                    messages.push(format!("retention cleared on '{}'", query.name));
1016                }
1017            }
1018        }
1019
1020        let mut contract = self
1021            .inner
1022            .db
1023            .collection_contract(&query.name)
1024            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1025        apply_alter_operations_to_contract(&mut contract, &query.operations);
1026        contract.version = contract.version.saturating_add(1);
1027        contract.updated_at_unix_ms = current_unix_ms();
1028        self.inner
1029            .db
1030            .save_collection_contract(contract)
1031            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1032        // Issue #301 — emit OperatorEvent when column schema changes on a
1033        // collection that has active event subscriptions, so operators know
1034        // downstream consumers may see a different payload shape.
1035        if !fields_added.is_empty() || !fields_removed.is_empty() {
1036            let sub_names: Vec<String> = self
1037                .inner
1038                .db
1039                .collection_contract(&query.name)
1040                .map(|c| {
1041                    c.subscriptions
1042                        .iter()
1043                        .filter(|s| s.enabled)
1044                        .map(|s| s.name.clone())
1045                        .collect()
1046                })
1047                .unwrap_or_default();
1048            if !sub_names.is_empty() {
1049                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1050                    collection: query.name.clone(),
1051                    subscription_names: sub_names.join(", "),
1052                    fields_added: fields_added.join(", "),
1053                    fields_removed: fields_removed.join(", "),
1054                    lsn: self.cdc_current_lsn(),
1055                }
1056                .emit_global();
1057            }
1058        }
1059
1060        self.clear_table_planner_stats(&query.name);
1061        self.invalidate_result_cache();
1062        // Issue #120 — refresh the schema-vocabulary entries from the
1063        // post-ALTER contract. Drop+recreate inside the index keeps
1064        // the invalidation guarantee complete (no stale columns from
1065        // before an ALTER ... DROP COLUMN).
1066        let post_alter_columns: Vec<String> = self
1067            .inner
1068            .db
1069            .collection_contract(&query.name)
1070            .map(|contract| {
1071                contract
1072                    .declared_columns
1073                    .iter()
1074                    .map(|col| col.name.clone())
1075                    .collect()
1076            })
1077            .unwrap_or_default();
1078        self.schema_vocabulary_apply(
1079            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1080                collection: query.name.clone(),
1081                columns: post_alter_columns,
1082                type_tags: Vec::new(),
1083                description: None,
1084            },
1085        );
1086
1087        let message = if messages.is_empty() {
1088            format!("table '{}' altered (no operations)", query.name)
1089        } else {
1090            format!("table '{}' altered: {}", query.name, messages.join(", "))
1091        };
1092
1093        Ok(RuntimeQueryResult::ok_message(
1094            raw_query.to_string(),
1095            &message,
1096            "alter",
1097        ))
1098    }
1099
1100    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1101    ///
1102    /// Pure read: computes the schema diff between the target table's
1103    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1104    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1105    /// JSON. Never mutates storage.
1106    pub fn execute_explain_alter(
1107        &self,
1108        raw_query: &str,
1109        query: &ExplainAlterQuery,
1110    ) -> RedDBResult<RuntimeQueryResult> {
1111        // Validate the target CREATE TABLE body so syntactically valid
1112        // but semantically broken targets (bad SQL types, duplicate
1113        // columns) are caught here rather than inside the diff engine.
1114        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1115
1116        let current_contract = self.inner.db.collection_contract(&query.target.name);
1117
1118        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1119            .as_ref()
1120            .map(|c| c.declared_columns.clone())
1121            .unwrap_or_default();
1122
1123        let diff = super::schema_diff::compute_column_diff(
1124            &query.target.name,
1125            &current_columns,
1126            &query.target.columns,
1127        );
1128
1129        let rendered = match query.format {
1130            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1131            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1132        };
1133
1134        let format_label = match query.format {
1135            ExplainFormat::Sql => "sql",
1136            ExplainFormat::Json => "json",
1137        };
1138
1139        let columns = vec![
1140            "table".to_string(),
1141            "format".to_string(),
1142            "diff".to_string(),
1143        ];
1144        let row = vec![
1145            ("table".to_string(), Value::text(query.target.name.clone())),
1146            ("format".to_string(), Value::text(format_label.to_string())),
1147            ("diff".to_string(), Value::text(rendered)),
1148        ];
1149
1150        Ok(RuntimeQueryResult::ok_records(
1151            raw_query.to_string(),
1152            columns,
1153            vec![row],
1154            "explain",
1155        ))
1156    }
1157
1158    /// Execute CREATE INDEX
1159    ///
1160    /// Registers a new index on a collection, builds it from existing data,
1161    /// and makes it available to the query executor for O(1) lookups.
1162    pub fn execute_create_index(
1163        &self,
1164        raw_query: &str,
1165        query: &CreateIndexQuery,
1166    ) -> RedDBResult<RuntimeQueryResult> {
1167        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1168        let store = self.inner.db.store();
1169
1170        // Verify the table exists
1171        let manager = store
1172            .get_collection(&query.table)
1173            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1174
1175        let method_kind = match query.method {
1176            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1177            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1178            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1179            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1180        };
1181
1182        // Extract fields from existing entities for indexing. Row
1183        // entities may arrive in either the "named" HashMap layout
1184        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1185        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1186        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1187        // leaves `named == None`). Prior to this commit the columnar
1188        // branch returned an empty field list, so `CREATE INDEX` built
1189        // a zero-entity index over HTTP-inserted data even though the
1190        // data was queryable via `SELECT`.
1191        let entities = manager.query_all(|_| true);
1192        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1193            entities
1194                .iter()
1195                .map(|e| {
1196                    let fields = match &e.data {
1197                        crate::storage::EntityData::Row(row) => {
1198                            if let Some(ref named) = row.named {
1199                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1200                            } else if let Some(ref schema) = row.schema {
1201                                // Columnar layout — pair each column
1202                                // with its positional name from the
1203                                // shared schema Arc.
1204                                schema
1205                                    .iter()
1206                                    .zip(row.columns.iter())
1207                                    .map(|(k, v)| (k.clone(), v.clone()))
1208                                    .collect()
1209                            } else {
1210                                Vec::new()
1211                            }
1212                        }
1213                        crate::storage::EntityData::Node(node) => node
1214                            .properties
1215                            .iter()
1216                            .map(|(k, v)| (k.clone(), v.clone()))
1217                            .collect(),
1218                        _ => Vec::new(),
1219                    };
1220                    (e.id, fields)
1221                })
1222                .collect();
1223
1224        // Build the index
1225        let indexed_count = self
1226            .inner
1227            .index_store
1228            .create_index(
1229                &query.name,
1230                &query.table,
1231                &query.columns,
1232                method_kind,
1233                query.unique,
1234                &entity_fields,
1235            )
1236            .map_err(RedDBError::Internal)?;
1237
1238        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1239            &query.table,
1240            &entity_fields,
1241        );
1242        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1243        self.invalidate_plan_cache();
1244
1245        // Register metadata
1246        self.inner
1247            .index_store
1248            .register(super::index_store::RegisteredIndex {
1249                name: query.name.clone(),
1250                collection: query.table.clone(),
1251                columns: query.columns.clone(),
1252                method: method_kind,
1253                unique: query.unique,
1254            });
1255        // Issue #120 — surface the index name + indexed columns in
1256        // the schema-vocabulary so AskPipeline (#121) can resolve
1257        // "the email index" back to its collection.
1258        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1259            collection: query.table.clone(),
1260            index: query.name.clone(),
1261            columns: query.columns.clone(),
1262        });
1263
1264        let method_str = format!("{}", query.method);
1265        let unique_str = if query.unique { "unique " } else { "" };
1266        let cols = query.columns.join(", ");
1267
1268        Ok(RuntimeQueryResult::ok_message(
1269            raw_query.to_string(),
1270            &format!(
1271                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1272                unique_str, query.name, query.table, cols, method_str, indexed_count
1273            ),
1274            "create",
1275        ))
1276    }
1277
1278    /// Execute DROP INDEX
1279    ///
1280    /// Removes an index from a collection.
1281    pub fn execute_drop_index(
1282        &self,
1283        raw_query: &str,
1284        query: &DropIndexQuery,
1285    ) -> RedDBResult<RuntimeQueryResult> {
1286        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1287        let store = self.inner.db.store();
1288
1289        // Verify the table exists
1290        if store.get_collection(&query.table).is_none() {
1291            if query.if_exists {
1292                return Ok(RuntimeQueryResult::ok_message(
1293                    raw_query.to_string(),
1294                    &format!("table '{}' does not exist", query.table),
1295                    "drop",
1296                ));
1297            }
1298            return Err(RedDBError::NotFound(format!(
1299                "table '{}' not found",
1300                query.table
1301            )));
1302        }
1303
1304        // Remove from IndexStore
1305        self.inner.index_store.drop_index(&query.name, &query.table);
1306        self.invalidate_plan_cache();
1307        // Issue #120 — keep the schema-vocabulary index entry in sync.
1308        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1309            collection: query.table.clone(),
1310            index: query.name.clone(),
1311        });
1312
1313        Ok(RuntimeQueryResult::ok_message(
1314            raw_query.to_string(),
1315            &format!("index '{}' dropped from '{}'", query.name, query.table),
1316            "drop",
1317        ))
1318    }
1319
1320    fn execute_drop_typed_collection(
1321        &self,
1322        raw_query: &str,
1323        name: &str,
1324        if_exists: bool,
1325        expected_model: CollectionModel,
1326        label: &str,
1327    ) -> RedDBResult<RuntimeQueryResult> {
1328        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1329        if is_system_schema_name(name) {
1330            return Err(RedDBError::Query("system schema is read-only".to_string()));
1331        }
1332        let store = self.inner.db.store();
1333        if store.get_collection(name).is_none() {
1334            if if_exists {
1335                return Ok(RuntimeQueryResult::ok_message(
1336                    raw_query.to_string(),
1337                    &format!("{label} '{name}' does not exist"),
1338                    "drop",
1339                ));
1340            }
1341            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1342        }
1343
1344        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1345        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1346        self.drop_collection_storage(raw_query, name, label)
1347    }
1348
1349    pub fn execute_truncate(
1350        &self,
1351        raw_query: &str,
1352        query: &TruncateQuery,
1353    ) -> RedDBResult<RuntimeQueryResult> {
1354        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1355        if is_system_schema_name(&query.name) {
1356            return Err(RedDBError::Query("system schema is read-only".to_string()));
1357        }
1358
1359        let label = query
1360            .model
1361            .map(polymorphic_resolver::model_name)
1362            .unwrap_or("collection");
1363        let store = self.inner.db.store();
1364        if store.get_collection(&query.name).is_none() {
1365            if query.if_exists {
1366                return Ok(RuntimeQueryResult::ok_message(
1367                    raw_query.to_string(),
1368                    &format!("{label} '{}' does not exist", query.name),
1369                    "truncate",
1370                ));
1371            }
1372            return Err(RedDBError::NotFound(format!(
1373                "{label} '{}' not found",
1374                query.name
1375            )));
1376        }
1377
1378        let actual =
1379            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1380        if let Some(expected) = query.model {
1381            polymorphic_resolver::ensure_model_match(expected, actual)?;
1382        }
1383
1384        if actual == CollectionModel::Queue {
1385            return self.execute_queue_command(
1386                raw_query,
1387                &QueueCommand::Purge {
1388                    queue: query.name.clone(),
1389                },
1390            );
1391        }
1392
1393        // Count before wiping so we can emit the aggregated truncate event.
1394        let affected = self.truncate_collection_entities(&query.name)?;
1395        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1396        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1397        self.inner.db.invalidate_vector_index(&query.name);
1398        self.clear_table_planner_stats(&query.name);
1399        self.invalidate_result_cache();
1400
1401        Ok(RuntimeQueryResult::ok_message(
1402            raw_query.to_string(),
1403            &format!(
1404                "{affected} entities truncated from {label} '{}'",
1405                query.name
1406            ),
1407            "truncate",
1408        ))
1409    }
1410
1411    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1412        let store = self.inner.db.store();
1413        let Some(manager) = store.get_collection(name) else {
1414            return Ok(0);
1415        };
1416        let entities = manager.query_all(|_| true);
1417        if entities.is_empty() {
1418            return Ok(0);
1419        }
1420
1421        for entity in &entities {
1422            let fields = entity_index_fields(&entity.data);
1423            self.inner
1424                .index_store
1425                .index_entity_delete(name, entity.id, &fields)
1426                .map_err(RedDBError::Internal)?;
1427        }
1428
1429        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1430        let deleted_ids = store
1431            .delete_batch(name, &ids)
1432            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1433        for id in &deleted_ids {
1434            store.context_index().remove_entity(*id);
1435        }
1436        Ok(deleted_ids.len() as u64)
1437    }
1438
1439    fn drop_collection_storage(
1440        &self,
1441        raw_query: &str,
1442        name: &str,
1443        label: &str,
1444    ) -> RedDBResult<RuntimeQueryResult> {
1445        let store = self.inner.db.store();
1446
1447        // Emit 1 collection_dropped event before storage is wiped.
1448        // Queue is preserved; subscription is removed with the contract below.
1449        let final_count = store
1450            .get_collection(name)
1451            .map(|manager| manager.query_all(|_| true).len() as u64)
1452            .unwrap_or(0);
1453        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1454            self,
1455            name,
1456            final_count,
1457        )?;
1458
1459        let orphaned_indices: Vec<String> = self
1460            .inner
1461            .index_store
1462            .list_indices(name)
1463            .into_iter()
1464            .map(|index| index.name)
1465            .collect();
1466        for index_name in &orphaned_indices {
1467            self.inner.index_store.drop_index(index_name, name);
1468        }
1469
1470        store
1471            .drop_collection(name)
1472            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1473        self.inner.db.invalidate_vector_index(name);
1474        self.inner.db.clear_collection_default_ttl_ms(name);
1475        self.inner
1476            .db
1477            .remove_collection_contract(name)
1478            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1479        self.clear_table_planner_stats(name);
1480        self.invalidate_result_cache();
1481        if let Some(store) = self.inner.auth_store.read().clone() {
1482            store.invalidate_visible_collections_cache();
1483        }
1484        self.inner
1485            .db
1486            .persist_metadata()
1487            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1488        self.schema_vocabulary_apply(
1489            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1490                collection: name.to_string(),
1491            },
1492        );
1493
1494        Ok(RuntimeQueryResult::ok_message(
1495            raw_query.to_string(),
1496            &format!("{label} '{name}' dropped"),
1497            "drop",
1498        ))
1499    }
1500}
1501
1502pub(crate) fn is_system_schema_name(name: &str) -> bool {
1503    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1504}
1505
1506fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1507    match data {
1508        EntityData::Row(row) => {
1509            if let Some(ref named) = row.named {
1510                named
1511                    .iter()
1512                    .map(|(key, value)| (key.clone(), value.clone()))
1513                    .collect()
1514            } else if let Some(ref schema) = row.schema {
1515                schema
1516                    .iter()
1517                    .zip(row.columns.iter())
1518                    .map(|(key, value)| (key.clone(), value.clone()))
1519                    .collect()
1520            } else {
1521                Vec::new()
1522            }
1523        }
1524        EntityData::Node(node) => node
1525            .properties
1526            .iter()
1527            .map(|(key, value)| (key.clone(), value.clone()))
1528            .collect(),
1529        _ => Vec::new(),
1530    }
1531}
1532
1533fn collection_contract_from_create_table(
1534    query: &CreateTableQuery,
1535) -> RedDBResult<crate::physical::CollectionContract> {
1536    let now = current_unix_ms();
1537    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1538        .columns
1539        .iter()
1540        .map(declared_column_contract_from_ddl)
1541        .collect();
1542    if query.timestamps {
1543        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1544        // columns that the write path populates from
1545        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1546        declared_columns.push(crate::physical::DeclaredColumnContract {
1547            name: "created_at".to_string(),
1548            data_type: "BIGINT".to_string(),
1549            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1550            not_null: true,
1551            default: None,
1552            compress: None,
1553            unique: false,
1554            primary_key: false,
1555            enum_variants: Vec::new(),
1556            array_element: None,
1557            decimal_precision: None,
1558        });
1559        declared_columns.push(crate::physical::DeclaredColumnContract {
1560            name: "updated_at".to_string(),
1561            data_type: "BIGINT".to_string(),
1562            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1563            not_null: true,
1564            default: None,
1565            compress: None,
1566            unique: false,
1567            primary_key: false,
1568            enum_variants: Vec::new(),
1569            array_element: None,
1570            decimal_precision: None,
1571        });
1572    }
1573    Ok(crate::physical::CollectionContract {
1574        name: query.name.clone(),
1575        declared_model: crate::catalog::CollectionModel::Table,
1576        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1577        origin: crate::physical::ContractOrigin::Explicit,
1578        version: 1,
1579        created_at_unix_ms: now,
1580        updated_at_unix_ms: now,
1581        default_ttl_ms: query.default_ttl_ms,
1582        vector_dimension: None,
1583        vector_metric: None,
1584        context_index_fields: query.context_index_fields.clone(),
1585        declared_columns,
1586        table_def: Some(build_table_def_from_create_table(query)?),
1587        timestamps_enabled: query.timestamps,
1588        context_index_enabled: query.context_index_enabled
1589            || !query.context_index_fields.is_empty(),
1590        metrics_raw_retention_ms: None,
1591        metrics_rollup_policies: Vec::new(),
1592        metrics_tenant_identity: None,
1593        metrics_namespace: None,
1594        append_only: query.append_only,
1595        subscriptions: query.subscriptions.clone(),
1596        session_key: None,
1597        session_gap_ms: None,
1598        retention_duration_ms: None,
1599    })
1600}
1601
1602fn default_collection_contract_for_existing_table(
1603    name: &str,
1604) -> crate::physical::CollectionContract {
1605    let now = current_unix_ms();
1606    crate::physical::CollectionContract {
1607        name: name.to_string(),
1608        declared_model: crate::catalog::CollectionModel::Table,
1609        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1610        origin: crate::physical::ContractOrigin::Explicit,
1611        version: 0,
1612        created_at_unix_ms: now,
1613        updated_at_unix_ms: now,
1614        default_ttl_ms: None,
1615        vector_dimension: None,
1616        vector_metric: None,
1617        context_index_fields: Vec::new(),
1618        declared_columns: Vec::new(),
1619        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1620        timestamps_enabled: false,
1621        context_index_enabled: false,
1622        metrics_raw_retention_ms: None,
1623        metrics_rollup_policies: Vec::new(),
1624        metrics_tenant_identity: None,
1625        metrics_namespace: None,
1626        append_only: false,
1627        subscriptions: Vec::new(),
1628        session_key: None,
1629        session_gap_ms: None,
1630        retention_duration_ms: None,
1631    }
1632}
1633
1634fn keyed_collection_contract(
1635    name: &str,
1636    model: crate::catalog::CollectionModel,
1637) -> crate::physical::CollectionContract {
1638    let now = current_unix_ms();
1639    crate::physical::CollectionContract {
1640        name: name.to_string(),
1641        declared_model: model,
1642        schema_mode: crate::catalog::SchemaMode::Dynamic,
1643        origin: crate::physical::ContractOrigin::Explicit,
1644        version: 1,
1645        created_at_unix_ms: now,
1646        updated_at_unix_ms: now,
1647        default_ttl_ms: None,
1648        vector_dimension: None,
1649        vector_metric: None,
1650        context_index_fields: Vec::new(),
1651        declared_columns: Vec::new(),
1652        table_def: None,
1653        timestamps_enabled: false,
1654        context_index_enabled: false,
1655        metrics_raw_retention_ms: None,
1656        metrics_rollup_policies: Vec::new(),
1657        metrics_tenant_identity: None,
1658        metrics_namespace: None,
1659        append_only: false,
1660        subscriptions: Vec::new(),
1661        session_key: None,
1662        session_gap_ms: None,
1663        retention_duration_ms: None,
1664    }
1665}
1666
1667fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1668    let now = current_unix_ms();
1669    crate::physical::CollectionContract {
1670        name: query.name.clone(),
1671        declared_model: crate::catalog::CollectionModel::Metrics,
1672        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1673        origin: crate::physical::ContractOrigin::Explicit,
1674        version: 1,
1675        created_at_unix_ms: now,
1676        updated_at_unix_ms: now,
1677        default_ttl_ms: query.default_ttl_ms,
1678        vector_dimension: None,
1679        vector_metric: None,
1680        context_index_fields: Vec::new(),
1681        declared_columns: Vec::new(),
1682        table_def: None,
1683        timestamps_enabled: false,
1684        context_index_enabled: false,
1685        metrics_raw_retention_ms: query.default_ttl_ms,
1686        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1687        metrics_tenant_identity: Some(
1688            query
1689                .tenant_by
1690                .clone()
1691                .unwrap_or_else(|| "current_tenant".to_string()),
1692        ),
1693        metrics_namespace: Some("default".to_string()),
1694        append_only: true,
1695        subscriptions: Vec::new(),
1696        session_key: None,
1697        session_gap_ms: None,
1698        retention_duration_ms: None,
1699    }
1700}
1701
1702fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1703    let now = current_unix_ms();
1704    crate::physical::CollectionContract {
1705        name: query.name.clone(),
1706        declared_model: crate::catalog::CollectionModel::Vector,
1707        schema_mode: crate::catalog::SchemaMode::Dynamic,
1708        origin: crate::physical::ContractOrigin::Explicit,
1709        version: 1,
1710        created_at_unix_ms: now,
1711        updated_at_unix_ms: now,
1712        default_ttl_ms: None,
1713        vector_dimension: Some(query.dimension),
1714        vector_metric: Some(query.metric),
1715        context_index_fields: Vec::new(),
1716        declared_columns: Vec::new(),
1717        table_def: None,
1718        timestamps_enabled: false,
1719        context_index_enabled: false,
1720        metrics_raw_retention_ms: None,
1721        metrics_rollup_policies: Vec::new(),
1722        metrics_tenant_identity: None,
1723        metrics_namespace: None,
1724        append_only: false,
1725        subscriptions: Vec::new(),
1726        session_key: None,
1727        session_gap_ms: None,
1728        retention_duration_ms: None,
1729    }
1730}
1731
1732fn declared_column_contract_from_ddl(
1733    column: &CreateColumnDef,
1734) -> crate::physical::DeclaredColumnContract {
1735    crate::physical::DeclaredColumnContract {
1736        name: column.name.clone(),
1737        data_type: column.data_type.clone(),
1738        sql_type: Some(column.sql_type.clone()),
1739        not_null: column.not_null,
1740        default: column.default.clone(),
1741        compress: column.compress,
1742        unique: column.unique,
1743        primary_key: column.primary_key,
1744        enum_variants: column.enum_variants.clone(),
1745        array_element: column.array_element.clone(),
1746        decimal_precision: column.decimal_precision,
1747    }
1748}
1749
1750fn apply_alter_operations_to_contract(
1751    contract: &mut crate::physical::CollectionContract,
1752    operations: &[AlterOperation],
1753) {
1754    if contract.table_def.is_none() {
1755        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1756    }
1757    for operation in operations {
1758        match operation {
1759            AlterOperation::AddColumn(column) => {
1760                if !contract
1761                    .declared_columns
1762                    .iter()
1763                    .any(|existing| existing.name == column.name)
1764                {
1765                    contract
1766                        .declared_columns
1767                        .push(declared_column_contract_from_ddl(column));
1768                }
1769                if let Some(table_def) = contract.table_def.as_mut() {
1770                    if table_def.get_column(&column.name).is_none() {
1771                        if let Ok(column_def) = column_def_from_ddl(column) {
1772                            if column.primary_key {
1773                                table_def.primary_key.push(column.name.clone());
1774                                table_def.constraints.push(
1775                                    crate::storage::schema::Constraint::new(
1776                                        format!("pk_{}", column.name),
1777                                        crate::storage::schema::ConstraintType::PrimaryKey,
1778                                    )
1779                                    .on_columns(vec![column.name.clone()]),
1780                                );
1781                            }
1782                            if column.unique {
1783                                table_def.constraints.push(
1784                                    crate::storage::schema::Constraint::new(
1785                                        format!("uniq_{}", column.name),
1786                                        crate::storage::schema::ConstraintType::Unique,
1787                                    )
1788                                    .on_columns(vec![column.name.clone()]),
1789                                );
1790                            }
1791                            if column.not_null {
1792                                table_def.constraints.push(
1793                                    crate::storage::schema::Constraint::new(
1794                                        format!("not_null_{}", column.name),
1795                                        crate::storage::schema::ConstraintType::NotNull,
1796                                    )
1797                                    .on_columns(vec![column.name.clone()]),
1798                                );
1799                            }
1800                            table_def.columns.push(column_def);
1801                        }
1802                    }
1803                }
1804            }
1805            AlterOperation::DropColumn(name) => {
1806                contract
1807                    .declared_columns
1808                    .retain(|column| column.name != *name);
1809                if let Some(table_def) = contract.table_def.as_mut() {
1810                    if let Some(index) = table_def.column_index(name) {
1811                        table_def.columns.remove(index);
1812                    }
1813                    table_def.primary_key.retain(|column| column != name);
1814                    table_def.constraints.retain(|constraint| {
1815                        !constraint.columns.iter().any(|column| column == name)
1816                    });
1817                    table_def
1818                        .indexes
1819                        .retain(|index| !index.columns.iter().any(|column| column == name));
1820                }
1821            }
1822            AlterOperation::RenameColumn { from, to } => {
1823                if contract
1824                    .declared_columns
1825                    .iter()
1826                    .any(|column| column.name == *to)
1827                {
1828                    continue;
1829                }
1830                if let Some(column) = contract
1831                    .declared_columns
1832                    .iter_mut()
1833                    .find(|column| column.name == *from)
1834                {
1835                    column.name = to.clone();
1836                }
1837                if let Some(table_def) = contract.table_def.as_mut() {
1838                    if let Some(column) = table_def
1839                        .columns
1840                        .iter_mut()
1841                        .find(|column| column.name == *from)
1842                    {
1843                        column.name = to.clone();
1844                    }
1845                    for primary_key in &mut table_def.primary_key {
1846                        if *primary_key == *from {
1847                            *primary_key = to.clone();
1848                        }
1849                    }
1850                    for constraint in &mut table_def.constraints {
1851                        for column in &mut constraint.columns {
1852                            if *column == *from {
1853                                *column = to.clone();
1854                            }
1855                        }
1856                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1857                            for column in ref_columns {
1858                                if *column == *from {
1859                                    *column = to.clone();
1860                                }
1861                            }
1862                        }
1863                    }
1864                    for index in &mut table_def.indexes {
1865                        for column in &mut index.columns {
1866                            if *column == *from {
1867                                *column = to.clone();
1868                            }
1869                        }
1870                    }
1871                }
1872            }
1873            // Partition ops don't touch the column contract — metadata is
1874            // persisted separately via `red_config.partition.*`.
1875            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1876            // RLS toggles don't touch the column contract — flag is persisted
1877            // separately via `red_config.rls.enabled.{table}` and enforced
1878            // through the in-memory `rls_enabled_tables` set.
1879            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1880            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1881            // and are enforced through `tenant_tables` + RLS auto-policy.
1882            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1883            AlterOperation::SetAppendOnly(on) => {
1884                contract.append_only = *on;
1885            }
1886            // VCS opt-in is persisted to red_vcs_settings by the
1887            // executor, not the contract — nothing to do here.
1888            AlterOperation::SetVersioned(_) => {}
1889            AlterOperation::EnableEvents(subscription) => {
1890                let mut subscription = subscription.clone();
1891                subscription.source = contract.name.clone();
1892                subscription.enabled = true;
1893                if let Some(existing) = contract
1894                    .subscriptions
1895                    .iter_mut()
1896                    .find(|existing| existing.target_queue == subscription.target_queue)
1897                {
1898                    *existing = subscription;
1899                } else {
1900                    contract.subscriptions.push(subscription);
1901                }
1902            }
1903            AlterOperation::DisableEvents => {
1904                for subscription in &mut contract.subscriptions {
1905                    subscription.enabled = false;
1906                }
1907            }
1908            AlterOperation::AddSubscription { name, descriptor } => {
1909                let mut sub = descriptor.clone();
1910                sub.name = name.clone();
1911                sub.source = contract.name.clone();
1912                sub.enabled = true;
1913                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1914                {
1915                    *existing = sub;
1916                } else {
1917                    contract.subscriptions.push(sub);
1918                }
1919            }
1920            AlterOperation::DropSubscription { name } => {
1921                contract.subscriptions.retain(|s| s.name != *name);
1922            }
1923            // Signer registry mutations live in `red_config` outside the
1924            // contract surface — the executor applied them directly via
1925            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
1926            // into the column-shaped contract.
1927            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1928            AlterOperation::SetRetention { duration_ms } => {
1929                contract.retention_duration_ms = Some(*duration_ms);
1930            }
1931            AlterOperation::UnsetRetention => {
1932                contract.retention_duration_ms = None;
1933            }
1934        }
1935    }
1936}
1937
1938/// Issue #580 — returns true if the contract carries at least one
1939/// column the retention filter can use as a timestamp anchor:
1940/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
1941/// or a user-declared column with a temporal data_type.
1942pub(crate) fn retention_timestamp_column_exists(
1943    contract: &crate::physical::CollectionContract,
1944) -> bool {
1945    if contract.timestamps_enabled {
1946        return true;
1947    }
1948    if matches!(
1949        contract.declared_model,
1950        crate::catalog::CollectionModel::TimeSeries
1951            | crate::catalog::CollectionModel::Metrics
1952    ) {
1953        // Time-series and metrics collections carry an intrinsic
1954        // timestamp axis on every row even without a declared column;
1955        // retention has a natural anchor.
1956        return true;
1957    }
1958    contract
1959        .declared_columns
1960        .iter()
1961        .any(|column| is_temporal_data_type(&column.data_type))
1962}
1963
1964fn is_temporal_data_type(data_type: &str) -> bool {
1965    let upper = data_type.to_ascii_uppercase();
1966    matches!(
1967        upper.as_str(),
1968        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
1969    )
1970}
1971
1972fn validate_event_subscriptions(
1973    runtime: &RedDBRuntime,
1974    source: &str,
1975    subscriptions: &[crate::catalog::SubscriptionDescriptor],
1976) -> RedDBResult<()> {
1977    for subscription in subscriptions
1978        .iter()
1979        .filter(|subscription| subscription.enabled)
1980    {
1981        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1982            return Err(RedDBError::Query(
1983                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1984            ));
1985        }
1986        validate_subscription_auth(runtime, source, subscription)?;
1987        if subscription.target_queue == source
1988            || subscription_would_create_cycle(
1989                &runtime.inner.db,
1990                source,
1991                &subscription.target_queue,
1992            )
1993        {
1994            return Err(RedDBError::Query(
1995                "subscription would create cycle".to_string(),
1996            ));
1997        }
1998        audit_subscription_redact_gap(runtime, source, subscription);
1999    }
2000    Ok(())
2001}
2002
2003fn validate_subscription_auth(
2004    runtime: &RedDBRuntime,
2005    source: &str,
2006    subscription: &crate::catalog::SubscriptionDescriptor,
2007) -> RedDBResult<()> {
2008    let auth_store = match runtime.inner.auth_store.read().clone() {
2009        Some(store) => store,
2010        None => return Ok(()),
2011    };
2012    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2013        Some(identity) => identity,
2014        None => return Ok(()),
2015    };
2016    let tenant = crate::runtime::impl_core::current_tenant();
2017    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2018
2019    if auth_store.iam_authorization_enabled() {
2020        let ctx = crate::auth::policies::EvalContext {
2021            principal_tenant: tenant.clone(),
2022            current_tenant: tenant.clone(),
2023            peer_ip: None,
2024            mfa_present: false,
2025            now_ms: crate::auth::now_ms(),
2026            principal_is_admin_role: role == crate::auth::Role::Admin,
2027        };
2028        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2029        if let Some(t) = tenant.as_deref() {
2030            source_resource = source_resource.with_tenant(t.to_string());
2031        }
2032        if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
2033            return Err(RedDBError::Query(format!(
2034                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2035                principal, source_resource.kind, source_resource.name
2036            )));
2037        }
2038
2039        let mut target_resource =
2040            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2041        if let Some(t) = tenant.as_deref() {
2042            target_resource = target_resource.with_tenant(t.to_string());
2043        }
2044        if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
2045            return Err(RedDBError::Query(format!(
2046                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2047                principal, target_resource.kind, target_resource.name
2048            )));
2049        }
2050        return Ok(());
2051    }
2052
2053    let ctx = crate::auth::privileges::AuthzContext {
2054        principal: &username,
2055        effective_role: role,
2056        tenant: tenant.as_deref(),
2057    };
2058    auth_store
2059        .check_grant(
2060            &ctx,
2061            crate::auth::privileges::Action::Select,
2062            &crate::auth::privileges::Resource::table_from_name(source),
2063        )
2064        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2065    auth_store
2066        .check_grant(
2067            &ctx,
2068            crate::auth::privileges::Action::Insert,
2069            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2070        )
2071        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2072    Ok(())
2073}
2074
2075fn audit_subscription_redact_gap(
2076    runtime: &RedDBRuntime,
2077    source: &str,
2078    subscription: &crate::catalog::SubscriptionDescriptor,
2079) {
2080    let auth_store = match runtime.inner.auth_store.read().clone() {
2081        Some(store) if store.iam_authorization_enabled() => store,
2082        _ => return,
2083    };
2084    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2085        Some(identity) => identity,
2086        None => return,
2087    };
2088    let tenant = crate::runtime::impl_core::current_tenant();
2089    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2090    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2091    if missing.is_empty() {
2092        return;
2093    }
2094
2095    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2096    tracing::warn!(
2097        target: "reddb::operator",
2098        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2099        source,
2100        subscription.target_queue,
2101        columns
2102    );
2103    let mut event = AuditEvent::builder("subscription_redact_gap")
2104        .principal(username)
2105        .source(AuditAuthSource::System)
2106        .resource(format!(
2107            "subscription:{}->{}",
2108            source, subscription.target_queue
2109        ))
2110        .outcome(Outcome::Success)
2111        .field(AuditFieldEscaper::field("source", source))
2112        .field(AuditFieldEscaper::field(
2113            "target_queue",
2114            subscription.target_queue.clone(),
2115        ))
2116        .field(AuditFieldEscaper::field(
2117            "subscription",
2118            subscription.name.clone(),
2119        ))
2120        .field(AuditFieldEscaper::field("columns", columns))
2121        .field(AuditFieldEscaper::field("role", role.as_str()));
2122    if let Some(t) = tenant {
2123        event = event.tenant(t);
2124    }
2125    runtime.inner.audit_log.record_event(event.build());
2126}
2127
2128fn subscription_redact_gap_columns(
2129    auth_store: &crate::auth::store::AuthStore,
2130    principal: &crate::auth::UserId,
2131    source: &str,
2132    subscription: &crate::catalog::SubscriptionDescriptor,
2133) -> BTreeSet<String> {
2134    let redacted: HashSet<String> = subscription
2135        .redact_fields
2136        .iter()
2137        .map(|field| field.to_ascii_lowercase())
2138        .collect();
2139    auth_store
2140        .effective_policies(principal)
2141        .iter()
2142        .flat_map(|policy| policy.statements.iter())
2143        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2144        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2145        .flat_map(|statement| statement.resources.iter())
2146        .filter_map(|resource| denied_column_for_source(resource, source))
2147        .filter(|column| !redact_covers_column(&redacted, source, column))
2148        .collect()
2149}
2150
2151fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2152    match pattern {
2153        crate::auth::policies::ActionPattern::Wildcard => true,
2154        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2155        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2156            "select".len() > prefix.len() + 1
2157                && "select".starts_with(prefix)
2158                && "select".as_bytes()[prefix.len()] == b':'
2159        }
2160    }
2161}
2162
2163fn denied_column_for_source(
2164    resource: &crate::auth::policies::ResourcePattern,
2165    source: &str,
2166) -> Option<String> {
2167    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2168        return None;
2169    };
2170    if kind != "column" {
2171        return None;
2172    }
2173    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2174    (column.table_resource_name() == source).then_some(column.column)
2175}
2176
2177fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2178    let column = column.to_ascii_lowercase();
2179    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2180    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2181}
2182
2183fn subscription_would_create_cycle(
2184    db: &crate::storage::unified::devx::RedDB,
2185    source: &str,
2186    target: &str,
2187) -> bool {
2188    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2189    for contract in db.collection_contracts() {
2190        for subscription in contract
2191            .subscriptions
2192            .into_iter()
2193            .filter(|subscription| subscription.enabled)
2194        {
2195            graph
2196                .entry(subscription.source)
2197                .or_default()
2198                .push(subscription.target_queue);
2199        }
2200    }
2201    graph
2202        .entry(source.to_string())
2203        .or_default()
2204        .push(target.to_string());
2205
2206    let mut stack = vec![target.to_string()];
2207    let mut seen = HashSet::new();
2208    while let Some(node) = stack.pop() {
2209        if node == source {
2210            return true;
2211        }
2212        if !seen.insert(node.clone()) {
2213            continue;
2214        }
2215        if let Some(next) = graph.get(&node) {
2216            stack.extend(next.iter().cloned());
2217        }
2218    }
2219    false
2220}
2221
2222pub(crate) fn ensure_event_target_queue_pub(
2223    runtime: &RedDBRuntime,
2224    queue: &str,
2225) -> RedDBResult<()> {
2226    ensure_event_target_queue(runtime, queue)
2227}
2228
2229fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2230    let store = runtime.inner.db.store();
2231    if store.get_collection(queue).is_some() {
2232        return Ok(());
2233    }
2234    store
2235        .create_collection(queue)
2236        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2237    runtime
2238        .inner
2239        .db
2240        .save_collection_contract(event_queue_collection_contract(queue))
2241        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2242    store.set_config_tree(
2243        &format!("queue.{queue}.mode"),
2244        &crate::serde_json::Value::String("fanout".to_string()),
2245    );
2246    Ok(())
2247}
2248
2249fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2250    let now = current_unix_ms();
2251    crate::physical::CollectionContract {
2252        name: queue.to_string(),
2253        declared_model: crate::catalog::CollectionModel::Queue,
2254        schema_mode: crate::catalog::SchemaMode::Dynamic,
2255        origin: crate::physical::ContractOrigin::Implicit,
2256        version: 1,
2257        created_at_unix_ms: now,
2258        updated_at_unix_ms: now,
2259        default_ttl_ms: None,
2260        vector_dimension: None,
2261        vector_metric: None,
2262        context_index_fields: Vec::new(),
2263        declared_columns: Vec::new(),
2264        table_def: None,
2265        timestamps_enabled: false,
2266        context_index_enabled: false,
2267        metrics_raw_retention_ms: None,
2268        metrics_rollup_policies: Vec::new(),
2269        metrics_tenant_identity: None,
2270        metrics_namespace: None,
2271        append_only: true,
2272        subscriptions: Vec::new(),
2273        session_key: None,
2274        session_gap_ms: None,
2275        retention_duration_ms: None,
2276    }
2277}
2278
2279fn build_table_def_from_create_table(
2280    query: &CreateTableQuery,
2281) -> RedDBResult<crate::storage::schema::TableDef> {
2282    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2283    for column in &query.columns {
2284        if column.primary_key {
2285            table.primary_key.push(column.name.clone());
2286            table.constraints.push(
2287                crate::storage::schema::Constraint::new(
2288                    format!("pk_{}", column.name),
2289                    crate::storage::schema::ConstraintType::PrimaryKey,
2290                )
2291                .on_columns(vec![column.name.clone()]),
2292            );
2293        }
2294        if column.unique {
2295            table.constraints.push(
2296                crate::storage::schema::Constraint::new(
2297                    format!("uniq_{}", column.name),
2298                    crate::storage::schema::ConstraintType::Unique,
2299                )
2300                .on_columns(vec![column.name.clone()]),
2301            );
2302        }
2303        if column.not_null {
2304            table.constraints.push(
2305                crate::storage::schema::Constraint::new(
2306                    format!("not_null_{}", column.name),
2307                    crate::storage::schema::ConstraintType::NotNull,
2308                )
2309                .on_columns(vec![column.name.clone()]),
2310            );
2311        }
2312        table.columns.push(column_def_from_ddl(column)?);
2313    }
2314    // WITH timestamps = true: append the two runtime-managed columns
2315    // to the schema so resolved_contract_columns exposes them to the
2316    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2317    // not-nullable; the write path auto-fills them.
2318    if query.timestamps {
2319        table.columns.push(
2320            crate::storage::schema::ColumnDef::new(
2321                "created_at".to_string(),
2322                crate::storage::schema::DataType::UnsignedInteger,
2323            )
2324            .not_null(),
2325        );
2326        table.columns.push(
2327            crate::storage::schema::ColumnDef::new(
2328                "updated_at".to_string(),
2329                crate::storage::schema::DataType::UnsignedInteger,
2330            )
2331            .not_null(),
2332        );
2333        table.constraints.push(
2334            crate::storage::schema::Constraint::new(
2335                "not_null_created_at".to_string(),
2336                crate::storage::schema::ConstraintType::NotNull,
2337            )
2338            .on_columns(vec!["created_at".to_string()]),
2339        );
2340        table.constraints.push(
2341            crate::storage::schema::Constraint::new(
2342                "not_null_updated_at".to_string(),
2343                crate::storage::schema::ConstraintType::NotNull,
2344            )
2345            .on_columns(vec!["updated_at".to_string()]),
2346        );
2347    }
2348    table
2349        .validate()
2350        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2351    Ok(table)
2352}
2353
2354fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2355    let data_type = resolve_declared_data_type(&column.data_type)
2356        .map_err(|err| RedDBError::Query(err.to_string()))?;
2357    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2358    if column.not_null {
2359        column_def = column_def.not_null();
2360    }
2361    if let Some(default) = &column.default {
2362        column_def = column_def.with_default(default.as_bytes().to_vec());
2363    }
2364    if column.compress.unwrap_or(0) > 0 {
2365        column_def = column_def.compressed();
2366    }
2367    if !column.enum_variants.is_empty() {
2368        column_def = column_def.with_variants(column.enum_variants.clone());
2369    }
2370    if let Some(precision) = column.decimal_precision {
2371        column_def = column_def.with_precision(precision);
2372    }
2373    if let Some(element_type) = &column.array_element {
2374        column_def = column_def.with_element_type(
2375            resolve_declared_data_type(element_type)
2376                .map_err(|err| RedDBError::Query(err.to_string()))?,
2377        );
2378    }
2379    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2380    if column.unique {
2381        column_def = column_def.with_metadata("unique", "true");
2382    }
2383    if column.primary_key {
2384        column_def = column_def.with_metadata("primary_key", "true");
2385    }
2386    Ok(column_def)
2387}
2388
2389fn current_unix_ms() -> u128 {
2390    std::time::SystemTime::now()
2391        .duration_since(std::time::UNIX_EPOCH)
2392        .unwrap_or_default()
2393        .as_millis()
2394}
2395
2396#[cfg(test)]
2397mod tests {
2398    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2399    use crate::auth::store::{AuthStore, PrincipalRef};
2400    use crate::auth::UserId;
2401    use crate::auth::{AuthConfig, Role};
2402    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2403    use crate::storage::schema::Value;
2404    use crate::{RedDBOptions, RedDBRuntime};
2405    use std::sync::Arc;
2406
2407    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2408        Policy {
2409            id: id.to_string(),
2410            version: 1,
2411            tenant: None,
2412            created_at: 0,
2413            updated_at: 0,
2414            statements: vec![Statement {
2415                sid: None,
2416                effect: Effect::Allow,
2417                actions: vec![ActionPattern::Exact(action.to_string())],
2418                resources: vec![ResourcePattern::Exact {
2419                    kind: "collection".to_string(),
2420                    name: collection.to_string(),
2421                }],
2422                condition: None,
2423            }],
2424        }
2425    }
2426
2427    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2428        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2429        *rt.inner.auth_store.write() = Some(store.clone());
2430        store
2431    }
2432
2433    #[test]
2434    fn drop_denied_without_iam_policy() {
2435        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2436        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2437        let store = wire_auth_store(&rt);
2438        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2439        let select_only = Policy {
2440            id: "select-only".to_string(),
2441            version: 1,
2442            tenant: None,
2443            created_at: 0,
2444            updated_at: 0,
2445            statements: vec![Statement {
2446                sid: None,
2447                effect: Effect::Allow,
2448                actions: vec![ActionPattern::Exact("select".to_string())],
2449                resources: vec![ResourcePattern::Wildcard],
2450                condition: None,
2451            }],
2452        };
2453        store.put_policy_internal(select_only).unwrap();
2454        let alice = UserId::from_parts(None, "alice");
2455        store
2456            .attach_policy(PrincipalRef::User(alice), "select-only")
2457            .unwrap();
2458        set_current_auth_identity("alice".to_string(), Role::Write);
2459        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2460        clear_current_auth_identity();
2461        assert!(
2462            format!("{err}").contains("denied by IAM policy"),
2463            "got: {err}"
2464        );
2465    }
2466
2467    #[test]
2468    fn drop_allowed_with_explicit_iam_policy() {
2469        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2470        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2471        let store = wire_auth_store(&rt);
2472        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2473        store.put_policy_internal(policy).unwrap();
2474        let bob = UserId::from_parts(None, "bob");
2475        store
2476            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2477            .unwrap();
2478        set_current_auth_identity("bob".to_string(), Role::Write);
2479        rt.execute_query("DROP TABLE bar").unwrap();
2480        clear_current_auth_identity();
2481    }
2482
2483    #[test]
2484    fn drop_allowed_with_wildcard_iam_policy() {
2485        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2486        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2487        let store = wire_auth_store(&rt);
2488        let policy = Policy {
2489            id: "allow-drop-all".to_string(),
2490            version: 1,
2491            tenant: None,
2492            created_at: 0,
2493            updated_at: 0,
2494            statements: vec![Statement {
2495                sid: None,
2496                effect: Effect::Allow,
2497                actions: vec![ActionPattern::Exact("drop".to_string())],
2498                resources: vec![ResourcePattern::Wildcard],
2499                condition: None,
2500            }],
2501        };
2502        store.put_policy_internal(policy).unwrap();
2503        let carl = UserId::from_parts(None, "carl");
2504        store
2505            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2506            .unwrap();
2507        set_current_auth_identity("carl".to_string(), Role::Write);
2508        rt.execute_query("DROP TABLE baz").unwrap();
2509        clear_current_auth_identity();
2510    }
2511
2512    #[test]
2513    fn truncate_denied_without_iam_policy() {
2514        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2515        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2516        let store = wire_auth_store(&rt);
2517        // A policy exists (IAM active) but gives no truncate right.
2518        let select_only = Policy {
2519            id: "select-only-2".to_string(),
2520            version: 1,
2521            tenant: None,
2522            created_at: 0,
2523            updated_at: 0,
2524            statements: vec![Statement {
2525                sid: None,
2526                effect: Effect::Allow,
2527                actions: vec![ActionPattern::Exact("select".to_string())],
2528                resources: vec![ResourcePattern::Wildcard],
2529                condition: None,
2530            }],
2531        };
2532        store.put_policy_internal(select_only).unwrap();
2533        let dana = UserId::from_parts(None, "dana");
2534        store
2535            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2536            .unwrap();
2537        set_current_auth_identity("dana".to_string(), Role::Write);
2538        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2539        clear_current_auth_identity();
2540        assert!(
2541            format!("{err}").contains("denied by IAM policy"),
2542            "got: {err}"
2543        );
2544    }
2545
2546    #[test]
2547    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2548        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2549        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2550            .unwrap();
2551        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2552            .unwrap();
2553        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2554            .unwrap();
2555
2556        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2557        assert_eq!(truncated.statement_type, "truncate");
2558        assert_eq!(truncated.affected_rows, 0);
2559
2560        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2561        assert!(empty.result.records.is_empty());
2562
2563        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2564            .unwrap();
2565        let selected = rt
2566            .execute_query("SELECT name FROM users WHERE id = 3")
2567            .unwrap();
2568        let name = selected.result.records[0].get("name").unwrap();
2569        assert_eq!(name, &Value::text("cy"));
2570        assert!(rt.db().collection_contract("users").is_some());
2571        assert!(rt
2572            .inner
2573            .index_store
2574            .list_indices("users")
2575            .iter()
2576            .any(|index| index.name == "idx_users_id"));
2577    }
2578
2579    #[test]
2580    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2581        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2582        rt.execute_query("CREATE QUEUE tasks").unwrap();
2583        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2584
2585        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2586        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2587
2588        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2589        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2590        assert_eq!(
2591            len.result.records[0].get("len"),
2592            Some(&Value::UnsignedInteger(0))
2593        );
2594    }
2595
2596    #[test]
2597    fn truncate_system_schema_is_read_only() {
2598        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2599        let err = rt
2600            .execute_query("TRUNCATE COLLECTION red.collections")
2601            .unwrap_err();
2602        assert!(format!("{err}").contains("system schema is read-only"));
2603    }
2604
2605    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2606
2607    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2608        let result = rt
2609            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2610            .expect("peek queue");
2611        result
2612            .result
2613            .records
2614            .iter()
2615            .map(
2616                |record| match record.get("payload").expect("payload column") {
2617                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2618                    other => panic!("expected JSON queue payload, got {other:?}"),
2619                },
2620            )
2621            .collect()
2622    }
2623
2624    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2625    /// `truncate` event, not one delete event per row.
2626    #[test]
2627    fn truncate_event_enabled_table_emits_single_truncate_event() {
2628        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2629        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2630            .unwrap();
2631        rt.execute_query(
2632            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2633        )
2634        .unwrap();
2635
2636        // Drain the 3 insert events so we start clean.
2637        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2638
2639        rt.execute_query("TRUNCATE TABLE users").unwrap();
2640
2641        let events = queue_payloads(&rt, "users_events");
2642        // Must be exactly 1 truncate event, not 3 delete events.
2643        assert_eq!(
2644            events.len(),
2645            1,
2646            "expected 1 truncate event, got {}",
2647            events.len()
2648        );
2649        let ev = events[0].as_object().expect("event is object");
2650        assert_eq!(
2651            ev.get("op").and_then(crate::json::Value::as_str),
2652            Some("truncate")
2653        );
2654        assert_eq!(
2655            ev.get("collection").and_then(crate::json::Value::as_str),
2656            Some("users")
2657        );
2658        assert_eq!(
2659            ev.get("entities_count")
2660                .and_then(crate::json::Value::as_u64),
2661            Some(3)
2662        );
2663        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2664        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2665        assert!(ev
2666            .get("event_id")
2667            .and_then(crate::json::Value::as_str)
2668            .is_some_and(|s| !s.is_empty()));
2669    }
2670
2671    /// `TRUNCATE users` on a collection without event subscription emits no events.
2672    #[test]
2673    fn truncate_no_events_collection_emits_nothing() {
2674        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2675        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2676            .unwrap();
2677        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2678            .unwrap();
2679        // No EVENTS subscription — truncate must work without touching any queue.
2680        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2681        // No crash, no queue to check. Just verify truncation happened.
2682        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2683        assert!(rows.result.records.is_empty());
2684    }
2685
2686    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2687    /// `collection_dropped` event. The subscription is removed from the
2688    /// source contract but the target queue is preserved for consumer drain.
2689    #[test]
2690    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2691        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2692        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2693            .unwrap();
2694        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2695            .unwrap();
2696
2697        // Drain insert events so we start clean.
2698        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2699
2700        rt.execute_query("DROP TABLE users").unwrap();
2701
2702        // Queue must still exist with 1 collection_dropped event.
2703        let events = queue_payloads(&rt, "users_events");
2704        assert_eq!(
2705            events.len(),
2706            1,
2707            "expected 1 collection_dropped event, got {}",
2708            events.len()
2709        );
2710        let ev = events[0].as_object().expect("event is object");
2711        assert_eq!(
2712            ev.get("op").and_then(crate::json::Value::as_str),
2713            Some("collection_dropped")
2714        );
2715        assert_eq!(
2716            ev.get("collection").and_then(crate::json::Value::as_str),
2717            Some("users")
2718        );
2719        assert_eq!(
2720            ev.get("final_entities_count")
2721                .and_then(crate::json::Value::as_u64),
2722            Some(2)
2723        );
2724        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2725        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2726        assert!(ev
2727            .get("event_id")
2728            .and_then(crate::json::Value::as_str)
2729            .is_some_and(|s| !s.is_empty()));
2730
2731        // Source collection is gone.
2732        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2733        assert!(
2734            format!("{err}").contains("users"),
2735            "expected not-found error"
2736        );
2737    }
2738
2739    /// `DROP TABLE users` on a collection without event subscription works
2740    /// normally with no event emitted.
2741    #[test]
2742    fn drop_no_events_collection_emits_nothing() {
2743        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2744        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2745            .unwrap();
2746        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2747            .unwrap();
2748        rt.execute_query("DROP TABLE plain").unwrap();
2749        // No crash and collection is gone.
2750        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2751        assert!(format!("{err}").contains("plain"));
2752    }
2753
2754    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2755
2756    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2757    #[test]
2758    fn ops_filter_insert_only_ignores_update_and_delete() {
2759        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2760        rt.execute_query(
2761            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2762        )
2763        .unwrap();
2764        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2765            .unwrap();
2766        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2767            .unwrap();
2768        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2769
2770        let events = queue_payloads(&rt, "items_events");
2771        // Only the INSERT should have fired.
2772        assert_eq!(
2773            events.len(),
2774            1,
2775            "expected 1 insert event, got {}",
2776            events.len()
2777        );
2778        assert_eq!(
2779            events[0]
2780                .as_object()
2781                .unwrap()
2782                .get("op")
2783                .and_then(crate::json::Value::as_str),
2784            Some("insert")
2785        );
2786    }
2787
2788    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2789    #[test]
2790    fn where_filter_skips_rows_that_do_not_match() {
2791        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2792        rt.execute_query(
2793            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2794        )
2795        .unwrap();
2796
2797        // This row should generate an event.
2798        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2799            .unwrap();
2800        // This row should NOT generate an event.
2801        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2802            .unwrap();
2803
2804        let events = queue_payloads(&rt, "users_events");
2805        assert_eq!(
2806            events.len(),
2807            1,
2808            "expected 1 event (only active), got {}",
2809            events.len()
2810        );
2811        let ev = events[0].as_object().unwrap();
2812        assert_eq!(
2813            ev.get("op").and_then(crate::json::Value::as_str),
2814            Some("insert")
2815        );
2816        let after = ev.get("after").unwrap().as_object().unwrap();
2817        assert_eq!(
2818            after.get("status").and_then(crate::json::Value::as_str),
2819            Some("active")
2820        );
2821    }
2822
2823    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2824    #[test]
2825    fn ops_filter_and_where_filter_combined() {
2826        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2827        rt.execute_query(
2828            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2829        )
2830        .unwrap();
2831
2832        // INSERT active → event
2833        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2834            .unwrap();
2835        // INSERT inactive → no event
2836        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2837            .unwrap();
2838        // UPDATE row 1 to inactive → after = inactive, no event
2839        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2840            .unwrap();
2841        // DELETE → ops_filter excludes it
2842        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2843
2844        let events = queue_payloads(&rt, "items_events");
2845        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2846        assert_eq!(
2847            events.len(),
2848            1,
2849            "expected 1 event, got {}: {events:?}",
2850            events.len()
2851        );
2852        assert_eq!(
2853            events[0]
2854                .as_object()
2855                .unwrap()
2856                .get("op")
2857                .and_then(crate::json::Value::as_str),
2858            Some("insert")
2859        );
2860    }
2861
2862    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2863    #[test]
2864    fn where_filter_on_delete_checks_before_state() {
2865        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866        rt.execute_query(
2867            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2868        )
2869        .unwrap();
2870
2871        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2872            .unwrap();
2873
2874        // Delete active row → event (before-state was active)
2875        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2876        // Delete inactive row → no event (before-state was inactive)
2877        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2878
2879        let events = queue_payloads(&rt, "users_events");
2880        assert_eq!(
2881            events.len(),
2882            1,
2883            "expected 1 delete event, got {}",
2884            events.len()
2885        );
2886        let ev = events[0].as_object().unwrap();
2887        assert_eq!(
2888            ev.get("op").and_then(crate::json::Value::as_str),
2889            Some("delete")
2890        );
2891    }
2892
2893    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2894
2895    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2896    #[test]
2897    fn alter_add_column_on_event_enabled_table_succeeds() {
2898        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2899        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2900            .unwrap();
2901        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2902        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2903            .unwrap();
2904        // The column is now in the contract.
2905        let contract = rt.db().collection_contract("users").unwrap();
2906        assert!(
2907            contract.declared_columns.iter().any(|c| c.name == "phone"),
2908            "phone column should be in contract"
2909        );
2910        // Subscription still enabled after the alter.
2911        assert!(
2912            contract.subscriptions.iter().any(|s| s.enabled),
2913            "subscription should remain enabled"
2914        );
2915    }
2916
2917    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2918    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2919    #[test]
2920    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2921        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2922        rt.execute_query(
2923            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2924        )
2925        .unwrap();
2926        // DROP COLUMN — schema change event path exercises, must not error.
2927        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2928            .unwrap();
2929        let contract = rt.db().collection_contract("items").unwrap();
2930        assert!(
2931            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2932            "secret column should be removed"
2933        );
2934        // ENABLE RLS — non-column op, no schema-change event (coverage).
2935        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2936            .unwrap();
2937        // Collection and subscription still intact.
2938        assert!(
2939            contract.subscriptions.iter().any(|s| s.enabled),
2940            "subscription should remain enabled"
2941        );
2942    }
2943}