Skip to main content

reddb_server/runtime/
impl_ddl.rs

1//! DDL execution: CREATE TABLE, DROP TABLE, ALTER TABLE via SQL AST
2//!
3//! Translates DDL statements into collection-level operations on the
4//! underlying `UnifiedStore`.  RedDB uses a flexible schema-on-read
5//! model, so column definitions are advisory metadata rather than
6//! rigid constraints.
7
8use super::*;
9use crate::catalog::CollectionModel;
10use crate::runtime::audit_log::{AuditAuthSource, AuditEvent, AuditFieldEscaper, Outcome};
11use crate::runtime::ddl::polymorphic_resolver;
12use crate::storage::query::{analyze_create_table, resolve_declared_data_type, CreateColumnDef};
13use std::collections::{BTreeSet, HashMap, HashSet};
14
15fn vault_master_key_ref(collection: &str) -> String {
16    format!("red.vault.{collection}.master_key")
17}
18
19impl RedDBRuntime {
20    /// Execute CREATE TABLE
21    ///
22    /// Creates a new collection in the store.  Column definitions are
23    /// recorded for introspection but do not enforce rigid schema
24    /// constraints.
25    pub fn execute_create_table(
26        &self,
27        raw_query: &str,
28        query: &CreateTableQuery,
29    ) -> RedDBResult<RuntimeQueryResult> {
30        if query.collection_model != CollectionModel::Table {
31            return self.execute_create_keyed_collection(raw_query, query);
32        }
33        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
34        let store = self.inner.db.store();
35        analyze_create_table(query).map_err(|err| RedDBError::Query(err.to_string()))?;
36        crate::reserved_fields::ensure_no_reserved_public_item_fields(
37            query.columns.iter().map(|column| column.name.as_str()),
38            &format!("table '{}'", query.name),
39        )?;
40        // Check if the collection already exists.
41        let exists = store.get_collection(&query.name).is_some();
42        if exists {
43            if query.if_not_exists {
44                return Ok(RuntimeQueryResult::ok_message(
45                    raw_query.to_string(),
46                    &format!("table '{}' already exists", query.name),
47                    "create",
48                ));
49            }
50            return Err(RedDBError::Query(format!(
51                "table '{}' already exists",
52                query.name
53            )));
54        }
55
56        // Build and validate the contract before mutating storage so invalid
57        // SQL types / duplicate columns do not leave partial side effects.
58        let contract = collection_contract_from_create_table(query)?;
59        validate_event_subscriptions(self, &query.name, &contract.subscriptions)?;
60        // Create the collection.
61        store
62            .create_collection(&query.name)
63            .map_err(|err| RedDBError::Internal(err.to_string()))?;
64        for subscription in &contract.subscriptions {
65            ensure_event_target_queue(self, &subscription.target_queue)?;
66        }
67        if let Some(default_ttl_ms) = query.default_ttl_ms {
68            self.inner
69                .db
70                .set_collection_default_ttl_ms(&query.name, default_ttl_ms);
71        }
72        self.inner
73            .db
74            .save_collection_contract(contract)
75            .map_err(|err| RedDBError::Internal(err.to_string()))?;
76        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
77            store.set_config_tree(
78                &format!("red.collection_tenants.{}", query.name),
79                &crate::serde_json::Value::String(tenant_id),
80            );
81        }
82        self.inner
83            .db
84            .persist_metadata()
85            .map_err(|err| RedDBError::Internal(err.to_string()))?;
86        self.refresh_table_planner_stats(&query.name);
87        self.invalidate_result_cache();
88        // Issue #120 — feed the create into the schema-vocabulary
89        // reverse index so AskPipeline (#121) sees this collection.
90        let columns: Vec<String> = query.columns.iter().map(|col| col.name.clone()).collect();
91        self.schema_vocabulary_apply(
92            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
93                collection: query.name.clone(),
94                columns,
95                type_tags: Vec::new(),
96                description: None,
97            },
98        );
99        // Partition metadata (Phase 2.2 PG parity).
100        //
101        // When the CREATE TABLE carries a `PARTITION BY RANGE|LIST|HASH (col)`
102        // clause, stamp the partition config into `red_config` under
103        // `partition.{table}.{by,column}`. Children are registered separately
104        // via `ALTER TABLE parent ATTACH PARTITION child ...`.
105        if let Some(spec) = &query.partition_by {
106            let kind_str = match spec.kind {
107                crate::storage::query::ast::PartitionKind::Range => "range",
108                crate::storage::query::ast::PartitionKind::List => "list",
109                crate::storage::query::ast::PartitionKind::Hash => "hash",
110            };
111            store.set_config_tree(
112                &format!("partition.{}.by", query.name),
113                &crate::serde_json::Value::String(kind_str.to_string()),
114            );
115            store.set_config_tree(
116                &format!("partition.{}.column", query.name),
117                &crate::serde_json::Value::String(spec.column.clone()),
118            );
119        }
120
121        // Table-scoped multi-tenancy (Phase 2.5.4).
122        //
123        // `CREATE TABLE t (...) TENANT BY (col)` declaration:
124        //   1. Persists the `tenant_tables.{table}.column` marker so
125        //      INSERTs can auto-fill and future opens re-hydrate.
126        //   2. Registers the table in the in-memory `tenant_tables`
127        //      HashMap used by the DML auto-fill path.
128        //   3. Installs an implicit RLS policy equivalent to
129        //      `USING (col = CURRENT_TENANT())` across all actions.
130        //   4. Flips `rls_enabled_tables` on so the policy applies.
131        if let Some(col) = &query.tenant_by {
132            store.set_config_tree(
133                &format!("tenant_tables.{}.column", query.name),
134                &crate::serde_json::Value::String(col.clone()),
135            );
136            self.register_tenant_table(&query.name, col);
137        }
138
139        let ttl_suffix = query
140            .default_ttl_ms
141            .map(|ttl_ms| format!(" with default TTL {}ms", ttl_ms))
142            .unwrap_or_default();
143
144        let tenant_suffix = query
145            .tenant_by
146            .as_ref()
147            .map(|col| format!(" (tenant-scoped by {col})"))
148            .unwrap_or_default();
149
150        Ok(RuntimeQueryResult::ok_message(
151            raw_query.to_string(),
152            &format!(
153                "table '{}' created{}{}",
154                query.name, ttl_suffix, tenant_suffix
155            ),
156            "create",
157        ))
158    }
159
160    fn execute_create_keyed_collection(
161        &self,
162        raw_query: &str,
163        query: &CreateTableQuery,
164    ) -> RedDBResult<RuntimeQueryResult> {
165        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
166        if is_system_schema_name(&query.name) {
167            return Err(RedDBError::Query("system schema is read-only".to_string()));
168        }
169        let store = self.inner.db.store();
170        let label = polymorphic_resolver::model_name(query.collection_model);
171        if store.get_collection(&query.name).is_some() {
172            if query.if_not_exists {
173                return Ok(RuntimeQueryResult::ok_message(
174                    raw_query.to_string(),
175                    &format!("{label} '{}' already exists", query.name),
176                    "create",
177                ));
178            }
179            return Err(RedDBError::Query(format!(
180                "{label} '{}' already exists",
181                query.name
182            )));
183        }
184
185        store
186            .create_collection(&query.name)
187            .map_err(|err| RedDBError::Internal(err.to_string()))?;
188        if query.collection_model == CollectionModel::Vault {
189            self.provision_vault_key_material(&query.name, query.vault_own_master_key)?;
190            let key_scope = if query.vault_own_master_key {
191                "own"
192            } else {
193                "cluster"
194            };
195            store.set_config_tree(
196                &format!("red.vault.{}.key_scope", query.name),
197                &crate::serde_json::Value::String(key_scope.to_string()),
198            );
199            store.set_config_tree(
200                &format!("red.vault.{}.status", query.name),
201                &crate::serde_json::Value::String("sealed".to_string()),
202            );
203        }
204        if query.collection_model == CollectionModel::Metrics {
205            for spec in &query.metrics_rollup_policies {
206                let policy = crate::storage::timeseries::retention::DownsamplePolicy::parse(spec)
207                    .ok_or_else(|| {
208                    RedDBError::Query(format!("invalid metrics rollup policy '{}'", spec))
209                })?;
210                if policy.source != "raw" {
211                    return Err(RedDBError::Query(format!(
212                        "invalid metrics rollup policy '{}': metrics v0 rollups must use raw as source",
213                        spec
214                    )));
215                }
216                if !matches!(
217                    policy.aggregation.as_str(),
218                    "avg" | "sum" | "min" | "max" | "count"
219                ) {
220                    return Err(RedDBError::Query(format!(
221                        "invalid metrics rollup policy '{}': supported aggregations are avg, sum, min, max, count",
222                        spec
223                    )));
224                }
225            }
226            if let Some(raw_retention_ms) = query.default_ttl_ms {
227                self.inner
228                    .db
229                    .set_collection_default_ttl_ms(&query.name, raw_retention_ms);
230                store.set_config_tree(
231                    &format!("red.metrics.{}.raw_retention_ms", query.name),
232                    &crate::serde_json::Value::Number(raw_retention_ms as f64),
233                );
234            }
235            let tenant_identity = query
236                .tenant_by
237                .clone()
238                .unwrap_or_else(|| "current_tenant".to_string());
239            store.set_config_tree(
240                &format!("red.metrics.{}.tenant_identity", query.name),
241                &crate::serde_json::Value::String(tenant_identity),
242            );
243            store.set_config_tree(
244                &format!("red.metrics.{}.namespace", query.name),
245                &crate::serde_json::Value::String("default".to_string()),
246            );
247            if !query.metrics_rollup_policies.is_empty() {
248                store.set_config_tree(
249                    &format!("red.metrics.{}.rollup_policies", query.name),
250                    &crate::serde_json::Value::Array(
251                        query
252                            .metrics_rollup_policies
253                            .iter()
254                            .cloned()
255                            .map(crate::serde_json::Value::String)
256                            .collect(),
257                    ),
258                );
259            }
260        }
261        let contract = if query.collection_model == CollectionModel::Metrics {
262            metrics_collection_contract(query)
263        } else {
264            keyed_collection_contract(&query.name, query.collection_model)
265        };
266        self.inner
267            .db
268            .save_collection_contract(contract)
269            .map_err(|err| RedDBError::Internal(err.to_string()))?;
270        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
271            store.set_config_tree(
272                &format!("red.collection_tenants.{}", query.name),
273                &crate::serde_json::Value::String(tenant_id),
274            );
275        }
276        self.inner
277            .db
278            .persist_metadata()
279            .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        self.invalidate_result_cache();
281
282        Ok(RuntimeQueryResult::ok_message(
283            raw_query.to_string(),
284            &format!("{label} '{}' created", query.name),
285            "create",
286        ))
287    }
288
289    pub fn execute_create_collection(
290        &self,
291        raw_query: &str,
292        query: &CreateCollectionQuery,
293    ) -> RedDBResult<RuntimeQueryResult> {
294        let model = match query.kind.as_str() {
295            "graph" => CollectionModel::Graph,
296            "document" => CollectionModel::Document,
297            "metrics" => CollectionModel::Metrics,
298            // KIND blockchain — issue #523 foundation: stored on top of a
299            // Table-shaped collection. The `chain` marker + reserved-column
300            // discipline make the difference. Schema validation, conflict
301            // retries, and `verify_chain` come in later iterations.
302            "blockchain" => CollectionModel::Table,
303            other => {
304                return Err(RedDBError::Query(format!(
305                    "NOT_YET_SUPPORTED: CREATE COLLECTION KIND {other} is not implemented"
306                )));
307            }
308        };
309        let create = CreateTableQuery {
310            collection_model: model,
311            name: query.name.clone(),
312            columns: Vec::new(),
313            if_not_exists: query.if_not_exists,
314            default_ttl_ms: None,
315            metrics_rollup_policies: Vec::new(),
316            context_index_fields: Vec::new(),
317            context_index_enabled: false,
318            timestamps: false,
319            partition_by: None,
320            tenant_by: None,
321            append_only: false,
322            subscriptions: Vec::new(),
323            vault_own_master_key: false,
324        };
325        let result = self.execute_create_table(raw_query, &create)?;
326        if query.kind == "blockchain" {
327            self.install_blockchain_kind(&query.name)?;
328        }
329        // Issue #522 — wire `SIGNED_BY (...)` into the runtime. The parser
330        // already produces a validated 32-byte pubkey list; installing
331        // the registry stamps the per-collection signer set into
332        // `red_config` so the INSERT path can load it cheaply.
333        if !query.allowed_signers.is_empty() {
334            let actor = crate::runtime::impl_core::current_user_projected()
335                .unwrap_or_else(|| "@system/create-collection".to_string());
336            crate::runtime::signed_writes_kind::install(
337                &self.inner.db.store(),
338                &query.name,
339                &query.allowed_signers,
340                &actor,
341            );
342        }
343        Ok(result)
344    }
345
346    /// Stamp `red.collection.{name}.kind = "chain"` and append the genesis
347    /// row. Idempotent against `IF NOT EXISTS`: if the collection already
348    /// has a row at height 0 we leave it.
349    fn install_blockchain_kind(&self, name: &str) -> RedDBResult<()> {
350        use crate::runtime::blockchain_kind;
351        use crate::storage::unified::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
352        use std::sync::Arc;
353
354        let store = self.inner.db.store();
355        blockchain_kind::mark_as_chain(&store, name);
356
357        let existing_tip = blockchain_kind::chain_tip(&store, name);
358        if existing_tip.height.is_some() {
359            return Ok(());
360        }
361
362        let fields = blockchain_kind::genesis_fields(blockchain_kind::now_ms());
363        let named: std::collections::HashMap<String, crate::storage::schema::Value> =
364            fields.into_iter().collect();
365        let entity = UnifiedEntity::new(
366            EntityId::new(0),
367            EntityKind::TableRow {
368                table: Arc::from(name),
369                row_id: 0,
370            },
371            EntityData::Row(RowData {
372                columns: Vec::new(),
373                named: Some(named),
374                schema: None,
375            }),
376        );
377        store
378            .insert_auto(name, entity)
379            .map_err(|err| RedDBError::Internal(err.to_string()))?;
380        // #524: prime the in-memory tip cache so the chain-tip endpoint and
381        // subsequent INSERTs don't have to scan the collection to find genesis.
382        if let Some(tip) = blockchain_kind::chain_tip_full(&store, name) {
383            self.inner
384                .chain_tip_cache
385                .lock()
386                .insert(name.to_string(), tip);
387        }
388        Ok(())
389    }
390
391    pub fn execute_create_vector(
392        &self,
393        raw_query: &str,
394        query: &CreateVectorQuery,
395    ) -> RedDBResult<RuntimeQueryResult> {
396        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
397        if is_system_schema_name(&query.name) {
398            return Err(RedDBError::Query("system schema is read-only".to_string()));
399        }
400        let store = self.inner.db.store();
401        if store.get_collection(&query.name).is_some() {
402            if query.if_not_exists {
403                return Ok(RuntimeQueryResult::ok_message(
404                    raw_query.to_string(),
405                    &format!("vector '{}' already exists", query.name),
406                    "create",
407                ));
408            }
409            return Err(RedDBError::Query(format!(
410                "vector '{}' already exists",
411                query.name
412            )));
413        }
414
415        store
416            .create_collection(&query.name)
417            .map_err(|err| RedDBError::Internal(err.to_string()))?;
418        self.inner
419            .db
420            .save_collection_contract(vector_collection_contract(query))
421            .map_err(|err| RedDBError::Internal(err.to_string()))?;
422        if let Some(tenant_id) = crate::runtime::impl_core::current_tenant() {
423            store.set_config_tree(
424                &format!("red.collection_tenants.{}", query.name),
425                &crate::serde_json::Value::String(tenant_id),
426            );
427        }
428        self.inner
429            .db
430            .persist_metadata()
431            .map_err(|err| RedDBError::Internal(err.to_string()))?;
432        self.invalidate_result_cache();
433
434        Ok(RuntimeQueryResult::ok_message(
435            raw_query.to_string(),
436            &format!("vector '{}' created", query.name),
437            "create",
438        ))
439    }
440
441    fn provision_vault_key_material(
442        &self,
443        collection: &str,
444        own_master_key: bool,
445    ) -> RedDBResult<()> {
446        let auth_store = self.inner.auth_store.read().clone().ok_or_else(|| {
447            RedDBError::Query("CREATE VAULT requires an enabled, unsealed vault".to_string())
448        })?;
449        if !auth_store.is_vault_backed() {
450            return Err(RedDBError::Query(
451                "CREATE VAULT requires an enabled, unsealed vault".to_string(),
452            ));
453        }
454
455        if auth_store.vault_secret_key().is_none() {
456            let key = crate::auth::store::random_bytes(32);
457            auth_store
458                .vault_kv_try_set("red.secret.aes_key".to_string(), hex::encode(key))
459                .map_err(|err| RedDBError::Query(err.to_string()))?;
460        }
461
462        if own_master_key {
463            let key = crate::auth::store::random_bytes(32);
464            auth_store
465                .vault_kv_try_set(vault_master_key_ref(collection), hex::encode(key))
466                .map_err(|err| RedDBError::Query(err.to_string()))?;
467        }
468
469        Ok(())
470    }
471
472    /// Execute DROP TABLE
473    ///
474    /// Drops the collection and all its data from the store.
475    pub fn execute_drop_table(
476        &self,
477        raw_query: &str,
478        query: &DropTableQuery,
479    ) -> RedDBResult<RuntimeQueryResult> {
480        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
481        let store = self.inner.db.store();
482
483        if is_system_schema_name(&query.name) {
484            return Err(RedDBError::Query("system schema is read-only".to_string()));
485        }
486
487        let exists = store.get_collection(&query.name).is_some();
488        if !exists {
489            if query.if_exists {
490                return Ok(RuntimeQueryResult::ok_message(
491                    raw_query.to_string(),
492                    &format!("table '{}' does not exist", query.name),
493                    "drop",
494                ));
495            }
496            return Err(RedDBError::NotFound(format!(
497                "table '{}' not found",
498                query.name
499            )));
500        }
501        let actual =
502            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
503        polymorphic_resolver::ensure_model_match(CollectionModel::Table, actual)?;
504
505        // Emit 1 collection_dropped event before storage is wiped.
506        // Queue is preserved; subscription is removed with the contract below.
507        let final_count = store
508            .get_collection(&query.name)
509            .map(|manager| manager.query_all(|_| true).len() as u64)
510            .unwrap_or(0);
511        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
512            self,
513            &query.name,
514            final_count,
515        )?;
516
517        let orphaned_indices: Vec<String> = self
518            .inner
519            .index_store
520            .list_indices(&query.name)
521            .into_iter()
522            .map(|index| index.name)
523            .collect();
524        for name in &orphaned_indices {
525            self.inner.index_store.drop_index(name, &query.name);
526        }
527
528        store
529            .drop_collection(&query.name)
530            .map_err(|err| RedDBError::Internal(err.to_string()))?;
531        self.inner.db.invalidate_vector_index(&query.name);
532        self.inner.db.clear_collection_default_ttl_ms(&query.name);
533        self.inner
534            .db
535            .remove_collection_contract(&query.name)
536            .map_err(|err| RedDBError::Internal(err.to_string()))?;
537        self.clear_table_planner_stats(&query.name);
538        self.invalidate_result_cache();
539        // Issue #119: a dropped collection vanishes from every
540        // (tenant, role)'s visible-collections set. Auth is optional in
541        // embedded mode so guard the call.
542        if let Some(store) = self.inner.auth_store.read().clone() {
543            store.invalidate_visible_collections_cache();
544        }
545        self.inner
546            .db
547            .persist_metadata()
548            .map_err(|err| RedDBError::Internal(err.to_string()))?;
549        // Issue #120 — drop both the collection entries *and* every
550        // index entry that was scoped to this collection.  Dropping the
551        // collection wipes columns + collection-name + type-tags +
552        // index hits in one pass via `purge_collection_entries`, so
553        // the explicit `DropIndex` calls would be redundant.
554        self.schema_vocabulary_apply(
555            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
556                collection: query.name.clone(),
557            },
558        );
559
560        Ok(RuntimeQueryResult::ok_message(
561            raw_query.to_string(),
562            &format!("table '{}' dropped", query.name),
563            "drop",
564        ))
565    }
566
567    pub fn execute_drop_graph(
568        &self,
569        raw_query: &str,
570        query: &DropGraphQuery,
571    ) -> RedDBResult<RuntimeQueryResult> {
572        self.execute_drop_typed_collection(
573            raw_query,
574            &query.name,
575            query.if_exists,
576            CollectionModel::Graph,
577            "graph",
578        )
579    }
580
581    pub fn execute_drop_vector(
582        &self,
583        raw_query: &str,
584        query: &DropVectorQuery,
585    ) -> RedDBResult<RuntimeQueryResult> {
586        self.execute_drop_typed_collection(
587            raw_query,
588            &query.name,
589            query.if_exists,
590            CollectionModel::Vector,
591            "vector",
592        )
593    }
594
595    pub fn execute_drop_document(
596        &self,
597        raw_query: &str,
598        query: &DropDocumentQuery,
599    ) -> RedDBResult<RuntimeQueryResult> {
600        self.execute_drop_typed_collection(
601            raw_query,
602            &query.name,
603            query.if_exists,
604            CollectionModel::Document,
605            "document",
606        )
607    }
608
609    pub fn execute_drop_kv(
610        &self,
611        raw_query: &str,
612        query: &DropKvQuery,
613    ) -> RedDBResult<RuntimeQueryResult> {
614        let label = polymorphic_resolver::model_name(query.model);
615        self.execute_drop_typed_collection(
616            raw_query,
617            &query.name,
618            query.if_exists,
619            query.model,
620            label,
621        )
622    }
623
624    pub fn execute_drop_collection(
625        &self,
626        raw_query: &str,
627        query: &DropCollectionQuery,
628    ) -> RedDBResult<RuntimeQueryResult> {
629        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
630        if is_system_schema_name(&query.name) {
631            return Err(RedDBError::Query("system schema is read-only".to_string()));
632        }
633        let store = self.inner.db.store();
634        if store.get_collection(&query.name).is_none() {
635            if query.if_exists {
636                return Ok(RuntimeQueryResult::ok_message(
637                    raw_query.to_string(),
638                    &format!("collection '{}' does not exist", query.name),
639                    "drop",
640                ));
641            }
642            return Err(RedDBError::NotFound(format!(
643                "collection '{}' not found",
644                query.name
645            )));
646        }
647
648        let actual =
649            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
650        if let Some(expected) = query.model {
651            polymorphic_resolver::ensure_model_match(expected, actual)?;
652        }
653
654        match actual {
655            CollectionModel::Table => self.execute_drop_table(
656                raw_query,
657                &DropTableQuery {
658                    name: query.name.clone(),
659                    if_exists: query.if_exists,
660                },
661            ),
662            CollectionModel::TimeSeries => self.execute_drop_timeseries(
663                raw_query,
664                &DropTimeSeriesQuery {
665                    name: query.name.clone(),
666                    if_exists: query.if_exists,
667                },
668            ),
669            CollectionModel::Queue => self.execute_drop_queue(
670                raw_query,
671                &DropQueueQuery {
672                    name: query.name.clone(),
673                    if_exists: query.if_exists,
674                },
675            ),
676            CollectionModel::Graph => self.execute_drop_graph(
677                raw_query,
678                &DropGraphQuery {
679                    name: query.name.clone(),
680                    if_exists: query.if_exists,
681                },
682            ),
683            CollectionModel::Vector => self.execute_drop_vector(
684                raw_query,
685                &DropVectorQuery {
686                    name: query.name.clone(),
687                    if_exists: query.if_exists,
688                },
689            ),
690            CollectionModel::Document => self.execute_drop_document(
691                raw_query,
692                &DropDocumentQuery {
693                    name: query.name.clone(),
694                    if_exists: query.if_exists,
695                },
696            ),
697            CollectionModel::Kv => self.execute_drop_kv(
698                raw_query,
699                &DropKvQuery {
700                    name: query.name.clone(),
701                    if_exists: query.if_exists,
702                    model: CollectionModel::Kv,
703                },
704            ),
705            CollectionModel::Config => self.execute_drop_kv(
706                raw_query,
707                &DropKvQuery {
708                    name: query.name.clone(),
709                    if_exists: query.if_exists,
710                    model: CollectionModel::Config,
711                },
712            ),
713            CollectionModel::Vault => self.execute_drop_kv(
714                raw_query,
715                &DropKvQuery {
716                    name: query.name.clone(),
717                    if_exists: query.if_exists,
718                    model: CollectionModel::Vault,
719                },
720            ),
721            CollectionModel::Hll => self.execute_probabilistic_command(
722                raw_query,
723                &ProbabilisticCommand::DropHll {
724                    name: query.name.clone(),
725                    if_exists: query.if_exists,
726                },
727            ),
728            CollectionModel::Sketch => self.execute_probabilistic_command(
729                raw_query,
730                &ProbabilisticCommand::DropSketch {
731                    name: query.name.clone(),
732                    if_exists: query.if_exists,
733                },
734            ),
735            CollectionModel::Filter => self.execute_probabilistic_command(
736                raw_query,
737                &ProbabilisticCommand::DropFilter {
738                    name: query.name.clone(),
739                    if_exists: query.if_exists,
740                },
741            ),
742            CollectionModel::Metrics => self.execute_drop_typed_collection(
743                raw_query,
744                &query.name,
745                query.if_exists,
746                CollectionModel::Metrics,
747                "metrics",
748            ),
749            CollectionModel::Mixed => self.execute_drop_typed_collection(
750                raw_query,
751                &query.name,
752                query.if_exists,
753                CollectionModel::Mixed,
754                "collection",
755            ),
756        }
757    }
758
759    /// Execute ALTER TABLE
760    ///
761    /// In RedDB's schema-on-read model, ALTER TABLE operations are advisory.
762    /// ADD COLUMN records the schema intent, DROP COLUMN removes it, and
763    /// RENAME COLUMN is a metadata rename.  Existing data is not rewritten.
764    pub fn execute_alter_table(
765        &self,
766        raw_query: &str,
767        query: &AlterTableQuery,
768    ) -> RedDBResult<RuntimeQueryResult> {
769        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
770        let store = self.inner.db.store();
771
772        // Verify the table exists.
773        if store.get_collection(&query.name).is_none() {
774            return Err(RedDBError::NotFound(format!(
775                "table '{}' not found",
776                query.name
777            )));
778        }
779
780        let mut messages = Vec::new();
781
782        // Collect column-level changes upfront for schema-change event emission below.
783        let fields_added: Vec<String> = query
784            .operations
785            .iter()
786            .filter_map(|op| {
787                if let AlterOperation::AddColumn(col) = op {
788                    Some(col.name.clone())
789                } else {
790                    None
791                }
792            })
793            .collect();
794        let fields_removed: Vec<String> = query
795            .operations
796            .iter()
797            .filter_map(|op| {
798                if let AlterOperation::DropColumn(name) = op {
799                    Some(name.clone())
800                } else {
801                    None
802                }
803            })
804            .collect();
805
806        for op in &query.operations {
807            match op {
808                AlterOperation::AddColumn(col) => {
809                    // Schema-on-read: column will be available on next insert.
810                    messages.push(format!("column '{}' added", col.name));
811                }
812                AlterOperation::DropColumn(name) => {
813                    messages.push(format!("column '{}' dropped", name));
814                }
815                AlterOperation::RenameColumn { from, to } => {
816                    messages.push(format!("column '{}' renamed to '{}'", from, to));
817                }
818                AlterOperation::AttachPartition { child, bound } => {
819                    // Persist child → parent binding in red_config so the
820                    // future planner-side pruner can enumerate children and
821                    // evaluate their bounds.
822                    store.set_config_tree(
823                        &format!("partition.{}.children.{}", query.name, child),
824                        &crate::serde_json::Value::String(bound.clone()),
825                    );
826                    messages.push(format!(
827                        "partition '{child}' attached to '{}' ({bound})",
828                        query.name
829                    ));
830                }
831                AlterOperation::DetachPartition { child } => {
832                    store.set_config_tree(
833                        &format!("partition.{}.children.{}", query.name, child),
834                        &crate::serde_json::Value::Null,
835                    );
836                    messages.push(format!(
837                        "partition '{child}' detached from '{}'",
838                        query.name
839                    ));
840                }
841                AlterOperation::EnableRowLevelSecurity => {
842                    self.inner
843                        .rls_enabled_tables
844                        .write()
845                        .insert(query.name.clone());
846                    // Persist flag so RLS survives restart via red_config.
847                    store.set_config_tree(
848                        &format!("rls.enabled.{}", query.name),
849                        &crate::serde_json::Value::Bool(true),
850                    );
851                    self.invalidate_plan_cache();
852                    messages.push(format!("row level security enabled on '{}'", query.name));
853                }
854                AlterOperation::DisableRowLevelSecurity => {
855                    self.inner.rls_enabled_tables.write().remove(&query.name);
856                    store.set_config_tree(
857                        &format!("rls.enabled.{}", query.name),
858                        &crate::serde_json::Value::Null,
859                    );
860                    self.invalidate_plan_cache();
861                    messages.push(format!("row level security disabled on '{}'", query.name));
862                }
863                // Phase 2.5.4: retrofit tenancy onto an existing table.
864                AlterOperation::EnableTenancy { column } => {
865                    store.set_config_tree(
866                        &format!("tenant_tables.{}.column", query.name),
867                        &crate::serde_json::Value::String(column.clone()),
868                    );
869                    self.register_tenant_table(&query.name, column);
870                    self.invalidate_plan_cache();
871                    messages.push(format!(
872                        "tenancy enabled on '{}' by column '{column}'",
873                        query.name
874                    ));
875                }
876                AlterOperation::DisableTenancy => {
877                    store.set_config_tree(
878                        &format!("tenant_tables.{}.column", query.name),
879                        &crate::serde_json::Value::Null,
880                    );
881                    self.unregister_tenant_table(&query.name);
882                    self.invalidate_plan_cache();
883                    messages.push(format!("tenancy disabled on '{}'", query.name));
884                }
885                AlterOperation::SetAppendOnly(on) => {
886                    // Contract is the single source of truth for the
887                    // UPDATE/DELETE parse-time guard. The flag lands
888                    // below via `apply_alter_operations_to_contract`;
889                    // here we only publish the human-readable message.
890                    messages.push(format!(
891                        "append_only {} on '{}'",
892                        if *on { "enabled" } else { "disabled" },
893                        query.name
894                    ));
895                }
896                AlterOperation::SetVersioned(on) => {
897                    // Opt the collection into (or out of) Git-for-Data.
898                    // Persists a row in red_vcs_settings; next AS OF /
899                    // merge / diff against this table honours the
900                    // flag. Retroactive: existing row versions whose
901                    // xmins are still pinned by commits become
902                    // reachable via AS OF COMMIT immediately.
903                    self.vcs_set_versioned(&query.name, *on)?;
904                    messages.push(format!(
905                        "versioned {} on '{}'",
906                        if *on { "enabled" } else { "disabled" },
907                        query.name
908                    ));
909                }
910                AlterOperation::EnableEvents(subscription) => {
911                    let mut subscription = subscription.clone();
912                    subscription.source = query.name.clone();
913                    validate_event_subscriptions(
914                        self,
915                        &query.name,
916                        std::slice::from_ref(&subscription),
917                    )?;
918                    ensure_event_target_queue(self, &subscription.target_queue)?;
919                    messages.push(format!(
920                        "events enabled on '{}' to '{}'",
921                        query.name, subscription.target_queue
922                    ));
923                }
924                AlterOperation::DisableEvents => {
925                    messages.push(format!("events disabled on '{}'", query.name));
926                }
927                AlterOperation::AddSubscription { name, descriptor } => {
928                    let mut sub = descriptor.clone();
929                    sub.name = name.clone();
930                    sub.source = query.name.clone();
931                    validate_event_subscriptions(self, &query.name, std::slice::from_ref(&sub))?;
932                    ensure_event_target_queue(self, &sub.target_queue)?;
933                    messages.push(format!(
934                        "subscription '{}' added on '{}' to '{}'",
935                        name, query.name, sub.target_queue
936                    ));
937                }
938                AlterOperation::DropSubscription { name } => {
939                    messages.push(format!(
940                        "subscription '{}' dropped on '{}'",
941                        name, query.name
942                    ));
943                }
944                AlterOperation::AddSigner { pubkey } => {
945                    // Issue #522 — admin-gated registry mutation. The
946                    // standard DDL `check_write` above gates by role; we
947                    // additionally verify the collection actually has a
948                    // signed-writes registry installed so this isn't a
949                    // covert way to retrofit one (use `CREATE COLLECTION
950                    // ... SIGNED_BY (...)` for that).
951                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
952                        return Err(RedDBError::Query(format!(
953                            "ALTER COLLECTION ADD SIGNER: '{}' has no signer registry; \
954                             recreate it with CREATE COLLECTION ... SIGNED_BY (...)",
955                            query.name
956                        )));
957                    }
958                    let actor = crate::runtime::impl_core::current_user_projected()
959                        .unwrap_or_else(|| "@system/alter".to_string());
960                    let changed = crate::runtime::signed_writes_kind::add_signer(
961                        &store,
962                        &query.name,
963                        *pubkey,
964                        &actor,
965                    );
966                    messages.push(format!(
967                        "signer {} on '{}'",
968                        if changed { "added" } else { "already present" },
969                        query.name
970                    ));
971                }
972                AlterOperation::RevokeSigner { pubkey } => {
973                    if !crate::runtime::signed_writes_kind::is_signed(&store, &query.name) {
974                        return Err(RedDBError::Query(format!(
975                            "ALTER COLLECTION REVOKE SIGNER: '{}' has no signer registry",
976                            query.name
977                        )));
978                    }
979                    let actor = crate::runtime::impl_core::current_user_projected()
980                        .unwrap_or_else(|| "@system/alter".to_string());
981                    let changed = crate::runtime::signed_writes_kind::revoke_signer(
982                        &store,
983                        &query.name,
984                        pubkey,
985                        &actor,
986                    );
987                    messages.push(format!(
988                        "signer {} on '{}'",
989                        if changed {
990                            "revoked"
991                        } else {
992                            "already revoked"
993                        },
994                        query.name
995                    ));
996                }
997                AlterOperation::SetRetention { duration_ms } => {
998                    // Issue #580 — validate that the collection has a
999                    // timestamp column the lazy-on-scan filter can key
1000                    // off. Without one there is no anchor for "older
1001                    // than now - duration"; reject at ALTER time so the
1002                    // policy can never silently hide all rows.
1003                    let existing = self.inner.db.collection_contract(&query.name);
1004                    let has_ts_column = existing
1005                        .as_ref()
1006                        .map(retention_timestamp_column_exists)
1007                        .unwrap_or(false);
1008                    if !has_ts_column {
1009                        return Err(RedDBError::Query(format!(
1010                            "ALTER COLLECTION SET RETENTION: '{}' has no timestamp \
1011                             column — declare a TIMESTAMP/TIMESTAMPMS/DATETIME column \
1012                             or enable WITH timestamps = true before setting a \
1013                             retention policy",
1014                            query.name
1015                        )));
1016                    }
1017                    messages.push(format!(
1018                        "retention set to {duration_ms} ms on '{}'",
1019                        query.name
1020                    ));
1021                }
1022                AlterOperation::UnsetRetention => {
1023                    messages.push(format!("retention cleared on '{}'", query.name));
1024                }
1025            }
1026        }
1027
1028        let mut contract = self
1029            .inner
1030            .db
1031            .collection_contract(&query.name)
1032            .unwrap_or_else(|| default_collection_contract_for_existing_table(&query.name));
1033        apply_alter_operations_to_contract(&mut contract, &query.operations);
1034        contract.version = contract.version.saturating_add(1);
1035        contract.updated_at_unix_ms = current_unix_ms();
1036        self.inner
1037            .db
1038            .save_collection_contract(contract)
1039            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1040        // Issue #301 — emit OperatorEvent when column schema changes on a
1041        // collection that has active event subscriptions, so operators know
1042        // downstream consumers may see a different payload shape.
1043        if !fields_added.is_empty() || !fields_removed.is_empty() {
1044            let sub_names: Vec<String> = self
1045                .inner
1046                .db
1047                .collection_contract(&query.name)
1048                .map(|c| {
1049                    c.subscriptions
1050                        .iter()
1051                        .filter(|s| s.enabled)
1052                        .map(|s| s.name.clone())
1053                        .collect()
1054                })
1055                .unwrap_or_default();
1056            if !sub_names.is_empty() {
1057                crate::telemetry::operator_event::OperatorEvent::SubscriptionSchemaChange {
1058                    collection: query.name.clone(),
1059                    subscription_names: sub_names.join(", "),
1060                    fields_added: fields_added.join(", "),
1061                    fields_removed: fields_removed.join(", "),
1062                    lsn: self.cdc_current_lsn(),
1063                }
1064                .emit_global();
1065            }
1066        }
1067
1068        self.clear_table_planner_stats(&query.name);
1069        self.invalidate_result_cache();
1070        // Issue #120 — refresh the schema-vocabulary entries from the
1071        // post-ALTER contract. Drop+recreate inside the index keeps
1072        // the invalidation guarantee complete (no stale columns from
1073        // before an ALTER ... DROP COLUMN).
1074        let post_alter_columns: Vec<String> = self
1075            .inner
1076            .db
1077            .collection_contract(&query.name)
1078            .map(|contract| {
1079                contract
1080                    .declared_columns
1081                    .iter()
1082                    .map(|col| col.name.clone())
1083                    .collect()
1084            })
1085            .unwrap_or_default();
1086        self.schema_vocabulary_apply(
1087            crate::runtime::schema_vocabulary::DdlEvent::AlterCollection {
1088                collection: query.name.clone(),
1089                columns: post_alter_columns,
1090                type_tags: Vec::new(),
1091                description: None,
1092            },
1093        );
1094
1095        let message = if messages.is_empty() {
1096            format!("table '{}' altered (no operations)", query.name)
1097        } else {
1098            format!("table '{}' altered: {}", query.name, messages.join(", "))
1099        };
1100
1101        Ok(RuntimeQueryResult::ok_message(
1102            raw_query.to_string(),
1103            &message,
1104            "alter",
1105        ))
1106    }
1107
1108    /// Execute EXPLAIN ALTER FOR CREATE TABLE
1109    ///
1110    /// Pure read: computes the schema diff between the target table's
1111    /// current `CollectionContract` and the embedded `CREATE TABLE` body,
1112    /// and returns it as SQL `ALTER TABLE` text (default) or structured
1113    /// JSON. Never mutates storage.
1114    pub fn execute_explain_alter(
1115        &self,
1116        raw_query: &str,
1117        query: &ExplainAlterQuery,
1118    ) -> RedDBResult<RuntimeQueryResult> {
1119        // Validate the target CREATE TABLE body so syntactically valid
1120        // but semantically broken targets (bad SQL types, duplicate
1121        // columns) are caught here rather than inside the diff engine.
1122        analyze_create_table(&query.target).map_err(|err| RedDBError::Query(err.to_string()))?;
1123
1124        let current_contract = self.inner.db.collection_contract(&query.target.name);
1125
1126        let current_columns: Vec<crate::physical::DeclaredColumnContract> = current_contract
1127            .as_ref()
1128            .map(|c| c.declared_columns.clone())
1129            .unwrap_or_default();
1130
1131        let diff = super::schema_diff::compute_column_diff(
1132            &query.target.name,
1133            &current_columns,
1134            &query.target.columns,
1135        );
1136
1137        let rendered = match query.format {
1138            ExplainFormat::Sql => super::schema_diff::format_as_sql(&diff),
1139            ExplainFormat::Json => super::schema_diff::format_as_json(&diff),
1140        };
1141
1142        let format_label = match query.format {
1143            ExplainFormat::Sql => "sql",
1144            ExplainFormat::Json => "json",
1145        };
1146
1147        let columns = vec![
1148            "table".to_string(),
1149            "format".to_string(),
1150            "diff".to_string(),
1151        ];
1152        let row = vec![
1153            ("table".to_string(), Value::text(query.target.name.clone())),
1154            ("format".to_string(), Value::text(format_label.to_string())),
1155            ("diff".to_string(), Value::text(rendered)),
1156        ];
1157
1158        Ok(RuntimeQueryResult::ok_records(
1159            raw_query.to_string(),
1160            columns,
1161            vec![row],
1162            "explain",
1163        ))
1164    }
1165
1166    /// Execute CREATE INDEX
1167    ///
1168    /// Registers a new index on a collection, builds it from existing data,
1169    /// and makes it available to the query executor for O(1) lookups.
1170    pub fn execute_create_index(
1171        &self,
1172        raw_query: &str,
1173        query: &CreateIndexQuery,
1174    ) -> RedDBResult<RuntimeQueryResult> {
1175        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1176        let store = self.inner.db.store();
1177
1178        // Verify the table exists
1179        let manager = store
1180            .get_collection(&query.table)
1181            .ok_or_else(|| RedDBError::NotFound(format!("table '{}' not found", query.table)))?;
1182
1183        let method_kind = match query.method {
1184            IndexMethod::Hash => super::index_store::IndexMethodKind::Hash,
1185            IndexMethod::BTree => super::index_store::IndexMethodKind::BTree,
1186            IndexMethod::Bitmap => super::index_store::IndexMethodKind::Bitmap,
1187            IndexMethod::RTree => super::index_store::IndexMethodKind::Spatial,
1188        };
1189
1190        // Extract fields from existing entities for indexing. Row
1191        // entities may arrive in either the "named" HashMap layout
1192        // (gRPC `BulkInsertBinary` path) OR the columnar layout
1193        // (HTTP `POST /collections/X/bulk/rows` fast path, which uses
1194        // `schema: Some(Arc<Vec<String>>)` + `columns: Vec<Value>` and
1195        // leaves `named == None`). Prior to this commit the columnar
1196        // branch returned an empty field list, so `CREATE INDEX` built
1197        // a zero-entity index over HTTP-inserted data even though the
1198        // data was queryable via `SELECT`.
1199        let entities = manager.query_all(|_| true);
1200        let entity_fields: Vec<(crate::storage::unified::EntityId, Vec<(String, Value)>)> =
1201            entities
1202                .iter()
1203                .map(|e| {
1204                    let fields = match &e.data {
1205                        crate::storage::EntityData::Row(row) => {
1206                            if let Some(ref named) = row.named {
1207                                named.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
1208                            } else if let Some(ref schema) = row.schema {
1209                                // Columnar layout — pair each column
1210                                // with its positional name from the
1211                                // shared schema Arc.
1212                                schema
1213                                    .iter()
1214                                    .zip(row.columns.iter())
1215                                    .map(|(k, v)| (k.clone(), v.clone()))
1216                                    .collect()
1217                            } else {
1218                                Vec::new()
1219                            }
1220                        }
1221                        crate::storage::EntityData::Node(node) => node
1222                            .properties
1223                            .iter()
1224                            .map(|(k, v)| (k.clone(), v.clone()))
1225                            .collect(),
1226                        _ => Vec::new(),
1227                    };
1228                    (e.id, fields)
1229                })
1230                .collect();
1231
1232        // Build the index
1233        let indexed_count = self
1234            .inner
1235            .index_store
1236            .create_index(
1237                &query.name,
1238                &query.table,
1239                &query.columns,
1240                method_kind,
1241                query.unique,
1242                &entity_fields,
1243            )
1244            .map_err(RedDBError::Internal)?;
1245
1246        let analyzed = crate::storage::query::planner::stats_catalog::analyze_entity_fields(
1247            &query.table,
1248            &entity_fields,
1249        );
1250        crate::storage::query::planner::stats_catalog::persist_table_stats(&store, &analyzed);
1251        self.invalidate_plan_cache();
1252
1253        // Register metadata
1254        self.inner
1255            .index_store
1256            .register(super::index_store::RegisteredIndex {
1257                name: query.name.clone(),
1258                collection: query.table.clone(),
1259                columns: query.columns.clone(),
1260                method: method_kind,
1261                unique: query.unique,
1262            });
1263        // Issue #120 — surface the index name + indexed columns in
1264        // the schema-vocabulary so AskPipeline (#121) can resolve
1265        // "the email index" back to its collection.
1266        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::CreateIndex {
1267            collection: query.table.clone(),
1268            index: query.name.clone(),
1269            columns: query.columns.clone(),
1270        });
1271
1272        let method_str = format!("{}", query.method);
1273        let unique_str = if query.unique { "unique " } else { "" };
1274        let cols = query.columns.join(", ");
1275
1276        Ok(RuntimeQueryResult::ok_message(
1277            raw_query.to_string(),
1278            &format!(
1279                "{}index '{}' created on '{}' ({}) using {} ({} entities indexed)",
1280                unique_str, query.name, query.table, cols, method_str, indexed_count
1281            ),
1282            "create",
1283        ))
1284    }
1285
1286    /// Execute DROP INDEX
1287    ///
1288    /// Removes an index from a collection.
1289    pub fn execute_drop_index(
1290        &self,
1291        raw_query: &str,
1292        query: &DropIndexQuery,
1293    ) -> RedDBResult<RuntimeQueryResult> {
1294        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1295        let store = self.inner.db.store();
1296
1297        // Verify the table exists
1298        if store.get_collection(&query.table).is_none() {
1299            if query.if_exists {
1300                return Ok(RuntimeQueryResult::ok_message(
1301                    raw_query.to_string(),
1302                    &format!("table '{}' does not exist", query.table),
1303                    "drop",
1304                ));
1305            }
1306            return Err(RedDBError::NotFound(format!(
1307                "table '{}' not found",
1308                query.table
1309            )));
1310        }
1311
1312        // Remove from IndexStore
1313        self.inner.index_store.drop_index(&query.name, &query.table);
1314        self.invalidate_plan_cache();
1315        // Issue #120 — keep the schema-vocabulary index entry in sync.
1316        self.schema_vocabulary_apply(crate::runtime::schema_vocabulary::DdlEvent::DropIndex {
1317            collection: query.table.clone(),
1318            index: query.name.clone(),
1319        });
1320
1321        Ok(RuntimeQueryResult::ok_message(
1322            raw_query.to_string(),
1323            &format!("index '{}' dropped from '{}'", query.name, query.table),
1324            "drop",
1325        ))
1326    }
1327
1328    fn execute_drop_typed_collection(
1329        &self,
1330        raw_query: &str,
1331        name: &str,
1332        if_exists: bool,
1333        expected_model: CollectionModel,
1334        label: &str,
1335    ) -> RedDBResult<RuntimeQueryResult> {
1336        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1337        if is_system_schema_name(name) {
1338            return Err(RedDBError::Query("system schema is read-only".to_string()));
1339        }
1340        let store = self.inner.db.store();
1341        if store.get_collection(name).is_none() {
1342            if if_exists {
1343                return Ok(RuntimeQueryResult::ok_message(
1344                    raw_query.to_string(),
1345                    &format!("{label} '{name}' does not exist"),
1346                    "drop",
1347                ));
1348            }
1349            return Err(RedDBError::NotFound(format!("{label} '{name}' not found")));
1350        }
1351
1352        let actual = polymorphic_resolver::resolve(name, &self.inner.db.catalog_model_snapshot())?;
1353        polymorphic_resolver::ensure_model_match(expected_model, actual)?;
1354        self.drop_collection_storage(raw_query, name, label)
1355    }
1356
1357    pub fn execute_truncate(
1358        &self,
1359        raw_query: &str,
1360        query: &TruncateQuery,
1361    ) -> RedDBResult<RuntimeQueryResult> {
1362        self.check_write(crate::runtime::write_gate::WriteKind::Ddl)?;
1363        if is_system_schema_name(&query.name) {
1364            return Err(RedDBError::Query("system schema is read-only".to_string()));
1365        }
1366
1367        let label = query
1368            .model
1369            .map(polymorphic_resolver::model_name)
1370            .unwrap_or("collection");
1371        let store = self.inner.db.store();
1372        if store.get_collection(&query.name).is_none() {
1373            if query.if_exists {
1374                return Ok(RuntimeQueryResult::ok_message(
1375                    raw_query.to_string(),
1376                    &format!("{label} '{}' does not exist", query.name),
1377                    "truncate",
1378                ));
1379            }
1380            return Err(RedDBError::NotFound(format!(
1381                "{label} '{}' not found",
1382                query.name
1383            )));
1384        }
1385
1386        let actual =
1387            polymorphic_resolver::resolve(&query.name, &self.inner.db.catalog_model_snapshot())?;
1388        if let Some(expected) = query.model {
1389            polymorphic_resolver::ensure_model_match(expected, actual)?;
1390        }
1391
1392        if actual == CollectionModel::Queue {
1393            return self.execute_queue_command(
1394                raw_query,
1395                &QueueCommand::Purge {
1396                    queue: query.name.clone(),
1397                },
1398            );
1399        }
1400
1401        // Count before wiping so we can emit the aggregated truncate event.
1402        let affected = self.truncate_collection_entities(&query.name)?;
1403        // Emit 1 truncate event (not N delete events) for event-enabled collections.
1404        crate::runtime::mutation::emit_truncate_event_for_collection(self, &query.name, affected)?;
1405        self.inner.db.invalidate_vector_index(&query.name);
1406        self.clear_table_planner_stats(&query.name);
1407        self.invalidate_result_cache();
1408
1409        Ok(RuntimeQueryResult::ok_message(
1410            raw_query.to_string(),
1411            &format!(
1412                "{affected} entities truncated from {label} '{}'",
1413                query.name
1414            ),
1415            "truncate",
1416        ))
1417    }
1418
1419    fn truncate_collection_entities(&self, name: &str) -> RedDBResult<u64> {
1420        let store = self.inner.db.store();
1421        let Some(manager) = store.get_collection(name) else {
1422            return Ok(0);
1423        };
1424        let entities = manager.query_all(|_| true);
1425        if entities.is_empty() {
1426            return Ok(0);
1427        }
1428
1429        for entity in &entities {
1430            let fields = entity_index_fields(&entity.data);
1431            self.inner
1432                .index_store
1433                .index_entity_delete(name, entity.id, &fields)
1434                .map_err(RedDBError::Internal)?;
1435        }
1436
1437        let ids = entities.iter().map(|entity| entity.id).collect::<Vec<_>>();
1438        let deleted_ids = store
1439            .delete_batch(name, &ids)
1440            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1441        for id in &deleted_ids {
1442            store.context_index().remove_entity(*id);
1443        }
1444        Ok(deleted_ids.len() as u64)
1445    }
1446
1447    fn drop_collection_storage(
1448        &self,
1449        raw_query: &str,
1450        name: &str,
1451        label: &str,
1452    ) -> RedDBResult<RuntimeQueryResult> {
1453        let store = self.inner.db.store();
1454
1455        // Emit 1 collection_dropped event before storage is wiped.
1456        // Queue is preserved; subscription is removed with the contract below.
1457        let final_count = store
1458            .get_collection(name)
1459            .map(|manager| manager.query_all(|_| true).len() as u64)
1460            .unwrap_or(0);
1461        crate::runtime::mutation::emit_collection_dropped_event_for_collection(
1462            self,
1463            name,
1464            final_count,
1465        )?;
1466
1467        let orphaned_indices: Vec<String> = self
1468            .inner
1469            .index_store
1470            .list_indices(name)
1471            .into_iter()
1472            .map(|index| index.name)
1473            .collect();
1474        for index_name in &orphaned_indices {
1475            self.inner.index_store.drop_index(index_name, name);
1476        }
1477
1478        store
1479            .drop_collection(name)
1480            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1481        self.inner.db.invalidate_vector_index(name);
1482        self.inner.db.clear_collection_default_ttl_ms(name);
1483        self.inner
1484            .db
1485            .remove_collection_contract(name)
1486            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1487        self.clear_table_planner_stats(name);
1488        self.invalidate_result_cache();
1489        if let Some(store) = self.inner.auth_store.read().clone() {
1490            store.invalidate_visible_collections_cache();
1491        }
1492        self.inner
1493            .db
1494            .persist_metadata()
1495            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1496        self.schema_vocabulary_apply(
1497            crate::runtime::schema_vocabulary::DdlEvent::DropCollection {
1498                collection: name.to_string(),
1499            },
1500        );
1501
1502        Ok(RuntimeQueryResult::ok_message(
1503            raw_query.to_string(),
1504            &format!("{label} '{name}' dropped"),
1505            "drop",
1506        ))
1507    }
1508}
1509
1510pub(crate) fn is_system_schema_name(name: &str) -> bool {
1511    name == "red" || name.starts_with("red.") || name.starts_with("__red_schema_")
1512}
1513
1514fn entity_index_fields(data: &EntityData) -> Vec<(String, Value)> {
1515    match data {
1516        EntityData::Row(row) => {
1517            if let Some(ref named) = row.named {
1518                named
1519                    .iter()
1520                    .map(|(key, value)| (key.clone(), value.clone()))
1521                    .collect()
1522            } else if let Some(ref schema) = row.schema {
1523                schema
1524                    .iter()
1525                    .zip(row.columns.iter())
1526                    .map(|(key, value)| (key.clone(), value.clone()))
1527                    .collect()
1528            } else {
1529                Vec::new()
1530            }
1531        }
1532        EntityData::Node(node) => node
1533            .properties
1534            .iter()
1535            .map(|(key, value)| (key.clone(), value.clone()))
1536            .collect(),
1537        _ => Vec::new(),
1538    }
1539}
1540
1541fn collection_contract_from_create_table(
1542    query: &CreateTableQuery,
1543) -> RedDBResult<crate::physical::CollectionContract> {
1544    let now = current_unix_ms();
1545    let mut declared_columns: Vec<crate::physical::DeclaredColumnContract> = query
1546        .columns
1547        .iter()
1548        .map(declared_column_contract_from_ddl)
1549        .collect();
1550    if query.timestamps {
1551        // Opt-in `WITH timestamps = true` auto-adds two user-visible
1552        // columns that the write path populates from
1553        // UnifiedEntity::created_at/updated_at. BIGINT unix-ms, NOT NULL.
1554        declared_columns.push(crate::physical::DeclaredColumnContract {
1555            name: "created_at".to_string(),
1556            data_type: "BIGINT".to_string(),
1557            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1558            not_null: true,
1559            default: None,
1560            compress: None,
1561            unique: false,
1562            primary_key: false,
1563            enum_variants: Vec::new(),
1564            array_element: None,
1565            decimal_precision: None,
1566        });
1567        declared_columns.push(crate::physical::DeclaredColumnContract {
1568            name: "updated_at".to_string(),
1569            data_type: "BIGINT".to_string(),
1570            sql_type: Some(crate::storage::schema::SqlTypeName::simple("BIGINT")),
1571            not_null: true,
1572            default: None,
1573            compress: None,
1574            unique: false,
1575            primary_key: false,
1576            enum_variants: Vec::new(),
1577            array_element: None,
1578            decimal_precision: None,
1579        });
1580    }
1581    Ok(crate::physical::CollectionContract {
1582        name: query.name.clone(),
1583        declared_model: crate::catalog::CollectionModel::Table,
1584        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1585        origin: crate::physical::ContractOrigin::Explicit,
1586        version: 1,
1587        created_at_unix_ms: now,
1588        updated_at_unix_ms: now,
1589        default_ttl_ms: query.default_ttl_ms,
1590        vector_dimension: None,
1591        vector_metric: None,
1592        context_index_fields: query.context_index_fields.clone(),
1593        declared_columns,
1594        table_def: Some(build_table_def_from_create_table(query)?),
1595        timestamps_enabled: query.timestamps,
1596        context_index_enabled: query.context_index_enabled
1597            || !query.context_index_fields.is_empty(),
1598        metrics_raw_retention_ms: None,
1599        metrics_rollup_policies: Vec::new(),
1600        metrics_tenant_identity: None,
1601        metrics_namespace: None,
1602        append_only: query.append_only,
1603        subscriptions: query.subscriptions.clone(),
1604        session_key: None,
1605        session_gap_ms: None,
1606        retention_duration_ms: None,
1607    })
1608}
1609
1610fn default_collection_contract_for_existing_table(
1611    name: &str,
1612) -> crate::physical::CollectionContract {
1613    let now = current_unix_ms();
1614    crate::physical::CollectionContract {
1615        name: name.to_string(),
1616        declared_model: crate::catalog::CollectionModel::Table,
1617        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1618        origin: crate::physical::ContractOrigin::Explicit,
1619        version: 0,
1620        created_at_unix_ms: now,
1621        updated_at_unix_ms: now,
1622        default_ttl_ms: None,
1623        vector_dimension: None,
1624        vector_metric: None,
1625        context_index_fields: Vec::new(),
1626        declared_columns: Vec::new(),
1627        table_def: Some(crate::storage::schema::TableDef::new(name.to_string())),
1628        timestamps_enabled: false,
1629        context_index_enabled: false,
1630        metrics_raw_retention_ms: None,
1631        metrics_rollup_policies: Vec::new(),
1632        metrics_tenant_identity: None,
1633        metrics_namespace: None,
1634        append_only: false,
1635        subscriptions: Vec::new(),
1636        session_key: None,
1637        session_gap_ms: None,
1638        retention_duration_ms: None,
1639    }
1640}
1641
1642fn keyed_collection_contract(
1643    name: &str,
1644    model: crate::catalog::CollectionModel,
1645) -> crate::physical::CollectionContract {
1646    let now = current_unix_ms();
1647    crate::physical::CollectionContract {
1648        name: name.to_string(),
1649        declared_model: model,
1650        schema_mode: crate::catalog::SchemaMode::Dynamic,
1651        origin: crate::physical::ContractOrigin::Explicit,
1652        version: 1,
1653        created_at_unix_ms: now,
1654        updated_at_unix_ms: now,
1655        default_ttl_ms: None,
1656        vector_dimension: None,
1657        vector_metric: None,
1658        context_index_fields: Vec::new(),
1659        declared_columns: Vec::new(),
1660        table_def: None,
1661        timestamps_enabled: false,
1662        context_index_enabled: false,
1663        metrics_raw_retention_ms: None,
1664        metrics_rollup_policies: Vec::new(),
1665        metrics_tenant_identity: None,
1666        metrics_namespace: None,
1667        append_only: false,
1668        subscriptions: Vec::new(),
1669        session_key: None,
1670        session_gap_ms: None,
1671        retention_duration_ms: None,
1672    }
1673}
1674
1675fn metrics_collection_contract(query: &CreateTableQuery) -> crate::physical::CollectionContract {
1676    let now = current_unix_ms();
1677    crate::physical::CollectionContract {
1678        name: query.name.clone(),
1679        declared_model: crate::catalog::CollectionModel::Metrics,
1680        schema_mode: crate::catalog::SchemaMode::SemiStructured,
1681        origin: crate::physical::ContractOrigin::Explicit,
1682        version: 1,
1683        created_at_unix_ms: now,
1684        updated_at_unix_ms: now,
1685        default_ttl_ms: query.default_ttl_ms,
1686        vector_dimension: None,
1687        vector_metric: None,
1688        context_index_fields: Vec::new(),
1689        declared_columns: Vec::new(),
1690        table_def: None,
1691        timestamps_enabled: false,
1692        context_index_enabled: false,
1693        metrics_raw_retention_ms: query.default_ttl_ms,
1694        metrics_rollup_policies: query.metrics_rollup_policies.clone(),
1695        metrics_tenant_identity: Some(
1696            query
1697                .tenant_by
1698                .clone()
1699                .unwrap_or_else(|| "current_tenant".to_string()),
1700        ),
1701        metrics_namespace: Some("default".to_string()),
1702        append_only: true,
1703        subscriptions: Vec::new(),
1704        session_key: None,
1705        session_gap_ms: None,
1706        retention_duration_ms: None,
1707    }
1708}
1709
1710fn vector_collection_contract(query: &CreateVectorQuery) -> crate::physical::CollectionContract {
1711    let now = current_unix_ms();
1712    crate::physical::CollectionContract {
1713        name: query.name.clone(),
1714        declared_model: crate::catalog::CollectionModel::Vector,
1715        schema_mode: crate::catalog::SchemaMode::Dynamic,
1716        origin: crate::physical::ContractOrigin::Explicit,
1717        version: 1,
1718        created_at_unix_ms: now,
1719        updated_at_unix_ms: now,
1720        default_ttl_ms: None,
1721        vector_dimension: Some(query.dimension),
1722        vector_metric: Some(query.metric),
1723        context_index_fields: Vec::new(),
1724        declared_columns: Vec::new(),
1725        table_def: None,
1726        timestamps_enabled: false,
1727        context_index_enabled: false,
1728        metrics_raw_retention_ms: None,
1729        metrics_rollup_policies: Vec::new(),
1730        metrics_tenant_identity: None,
1731        metrics_namespace: None,
1732        append_only: false,
1733        subscriptions: Vec::new(),
1734        session_key: None,
1735        session_gap_ms: None,
1736        retention_duration_ms: None,
1737    }
1738}
1739
1740fn declared_column_contract_from_ddl(
1741    column: &CreateColumnDef,
1742) -> crate::physical::DeclaredColumnContract {
1743    crate::physical::DeclaredColumnContract {
1744        name: column.name.clone(),
1745        data_type: column.data_type.clone(),
1746        sql_type: Some(column.sql_type.clone()),
1747        not_null: column.not_null,
1748        default: column.default.clone(),
1749        compress: column.compress,
1750        unique: column.unique,
1751        primary_key: column.primary_key,
1752        enum_variants: column.enum_variants.clone(),
1753        array_element: column.array_element.clone(),
1754        decimal_precision: column.decimal_precision,
1755    }
1756}
1757
1758fn apply_alter_operations_to_contract(
1759    contract: &mut crate::physical::CollectionContract,
1760    operations: &[AlterOperation],
1761) {
1762    if contract.table_def.is_none() {
1763        contract.table_def = Some(crate::storage::schema::TableDef::new(contract.name.clone()));
1764    }
1765    for operation in operations {
1766        match operation {
1767            AlterOperation::AddColumn(column) => {
1768                if !contract
1769                    .declared_columns
1770                    .iter()
1771                    .any(|existing| existing.name == column.name)
1772                {
1773                    contract
1774                        .declared_columns
1775                        .push(declared_column_contract_from_ddl(column));
1776                }
1777                if let Some(table_def) = contract.table_def.as_mut() {
1778                    if table_def.get_column(&column.name).is_none() {
1779                        if let Ok(column_def) = column_def_from_ddl(column) {
1780                            if column.primary_key {
1781                                table_def.primary_key.push(column.name.clone());
1782                                table_def.constraints.push(
1783                                    crate::storage::schema::Constraint::new(
1784                                        format!("pk_{}", column.name),
1785                                        crate::storage::schema::ConstraintType::PrimaryKey,
1786                                    )
1787                                    .on_columns(vec![column.name.clone()]),
1788                                );
1789                            }
1790                            if column.unique {
1791                                table_def.constraints.push(
1792                                    crate::storage::schema::Constraint::new(
1793                                        format!("uniq_{}", column.name),
1794                                        crate::storage::schema::ConstraintType::Unique,
1795                                    )
1796                                    .on_columns(vec![column.name.clone()]),
1797                                );
1798                            }
1799                            if column.not_null {
1800                                table_def.constraints.push(
1801                                    crate::storage::schema::Constraint::new(
1802                                        format!("not_null_{}", column.name),
1803                                        crate::storage::schema::ConstraintType::NotNull,
1804                                    )
1805                                    .on_columns(vec![column.name.clone()]),
1806                                );
1807                            }
1808                            table_def.columns.push(column_def);
1809                        }
1810                    }
1811                }
1812            }
1813            AlterOperation::DropColumn(name) => {
1814                contract
1815                    .declared_columns
1816                    .retain(|column| column.name != *name);
1817                if let Some(table_def) = contract.table_def.as_mut() {
1818                    if let Some(index) = table_def.column_index(name) {
1819                        table_def.columns.remove(index);
1820                    }
1821                    table_def.primary_key.retain(|column| column != name);
1822                    table_def.constraints.retain(|constraint| {
1823                        !constraint.columns.iter().any(|column| column == name)
1824                    });
1825                    table_def
1826                        .indexes
1827                        .retain(|index| !index.columns.iter().any(|column| column == name));
1828                }
1829            }
1830            AlterOperation::RenameColumn { from, to } => {
1831                if contract
1832                    .declared_columns
1833                    .iter()
1834                    .any(|column| column.name == *to)
1835                {
1836                    continue;
1837                }
1838                if let Some(column) = contract
1839                    .declared_columns
1840                    .iter_mut()
1841                    .find(|column| column.name == *from)
1842                {
1843                    column.name = to.clone();
1844                }
1845                if let Some(table_def) = contract.table_def.as_mut() {
1846                    if let Some(column) = table_def
1847                        .columns
1848                        .iter_mut()
1849                        .find(|column| column.name == *from)
1850                    {
1851                        column.name = to.clone();
1852                    }
1853                    for primary_key in &mut table_def.primary_key {
1854                        if *primary_key == *from {
1855                            *primary_key = to.clone();
1856                        }
1857                    }
1858                    for constraint in &mut table_def.constraints {
1859                        for column in &mut constraint.columns {
1860                            if *column == *from {
1861                                *column = to.clone();
1862                            }
1863                        }
1864                        if let Some(ref_columns) = constraint.ref_columns.as_mut() {
1865                            for column in ref_columns {
1866                                if *column == *from {
1867                                    *column = to.clone();
1868                                }
1869                            }
1870                        }
1871                    }
1872                    for index in &mut table_def.indexes {
1873                        for column in &mut index.columns {
1874                            if *column == *from {
1875                                *column = to.clone();
1876                            }
1877                        }
1878                    }
1879                }
1880            }
1881            // Partition ops don't touch the column contract — metadata is
1882            // persisted separately via `red_config.partition.*`.
1883            AlterOperation::AttachPartition { .. } | AlterOperation::DetachPartition { .. } => {}
1884            // RLS toggles don't touch the column contract — flag is persisted
1885            // separately via `red_config.rls.enabled.{table}` and enforced
1886            // through the in-memory `rls_enabled_tables` set.
1887            AlterOperation::EnableRowLevelSecurity | AlterOperation::DisableRowLevelSecurity => {}
1888            // Phase 2.5.4: tenancy toggles persist via `red_config.tenant_tables.*`
1889            // and are enforced through `tenant_tables` + RLS auto-policy.
1890            AlterOperation::EnableTenancy { .. } | AlterOperation::DisableTenancy => {}
1891            AlterOperation::SetAppendOnly(on) => {
1892                contract.append_only = *on;
1893            }
1894            // VCS opt-in is persisted to red_vcs_settings by the
1895            // executor, not the contract — nothing to do here.
1896            AlterOperation::SetVersioned(_) => {}
1897            AlterOperation::EnableEvents(subscription) => {
1898                let mut subscription = subscription.clone();
1899                subscription.source = contract.name.clone();
1900                subscription.enabled = true;
1901                if let Some(existing) = contract
1902                    .subscriptions
1903                    .iter_mut()
1904                    .find(|existing| existing.target_queue == subscription.target_queue)
1905                {
1906                    *existing = subscription;
1907                } else {
1908                    contract.subscriptions.push(subscription);
1909                }
1910            }
1911            AlterOperation::DisableEvents => {
1912                for subscription in &mut contract.subscriptions {
1913                    subscription.enabled = false;
1914                }
1915            }
1916            AlterOperation::AddSubscription { name, descriptor } => {
1917                let mut sub = descriptor.clone();
1918                sub.name = name.clone();
1919                sub.source = contract.name.clone();
1920                sub.enabled = true;
1921                if let Some(existing) = contract.subscriptions.iter_mut().find(|s| s.name == *name)
1922                {
1923                    *existing = sub;
1924                } else {
1925                    contract.subscriptions.push(sub);
1926                }
1927            }
1928            AlterOperation::DropSubscription { name } => {
1929                contract.subscriptions.retain(|s| s.name != *name);
1930            }
1931            // Signer registry mutations live in `red_config` outside the
1932            // contract surface — the executor applied them directly via
1933            // `signed_writes_kind::{add,revoke}_signer`. Nothing to fold
1934            // into the column-shaped contract.
1935            AlterOperation::AddSigner { .. } | AlterOperation::RevokeSigner { .. } => {}
1936            AlterOperation::SetRetention { duration_ms } => {
1937                contract.retention_duration_ms = Some(*duration_ms);
1938            }
1939            AlterOperation::UnsetRetention => {
1940                contract.retention_duration_ms = None;
1941            }
1942        }
1943    }
1944}
1945
1946/// Issue #580 — returns true if the contract carries at least one
1947/// column the retention filter can use as a timestamp anchor:
1948/// either `WITH timestamps = true` (auto `created_at` / `updated_at`)
1949/// or a user-declared column with a temporal data_type.
1950pub(crate) fn retention_timestamp_column_exists(
1951    contract: &crate::physical::CollectionContract,
1952) -> bool {
1953    if contract.timestamps_enabled {
1954        return true;
1955    }
1956    if matches!(
1957        contract.declared_model,
1958        crate::catalog::CollectionModel::TimeSeries | crate::catalog::CollectionModel::Metrics
1959    ) {
1960        // Time-series and metrics collections carry an intrinsic
1961        // timestamp axis on every row even without a declared column;
1962        // retention has a natural anchor.
1963        return true;
1964    }
1965    contract
1966        .declared_columns
1967        .iter()
1968        .any(|column| is_temporal_data_type(&column.data_type))
1969}
1970
1971fn is_temporal_data_type(data_type: &str) -> bool {
1972    let upper = data_type.to_ascii_uppercase();
1973    matches!(
1974        upper.as_str(),
1975        "TIMESTAMP" | "TIMESTAMPMS" | "TIMESTAMP_MS" | "DATETIME" | "DATE"
1976    )
1977}
1978
1979fn validate_event_subscriptions(
1980    runtime: &RedDBRuntime,
1981    source: &str,
1982    subscriptions: &[crate::catalog::SubscriptionDescriptor],
1983) -> RedDBResult<()> {
1984    for subscription in subscriptions
1985        .iter()
1986        .filter(|subscription| subscription.enabled)
1987    {
1988        if subscription.all_tenants && crate::runtime::impl_core::current_tenant().is_some() {
1989            return Err(RedDBError::Query(
1990                "cross-tenant subscription requires cluster-admin capability (events:cluster_subscribe)".to_string(),
1991            ));
1992        }
1993        validate_subscription_auth(runtime, source, subscription)?;
1994        if subscription.target_queue == source
1995            || subscription_would_create_cycle(
1996                &runtime.inner.db,
1997                source,
1998                &subscription.target_queue,
1999            )
2000        {
2001            return Err(RedDBError::Query(
2002                "subscription would create cycle".to_string(),
2003            ));
2004        }
2005        audit_subscription_redact_gap(runtime, source, subscription);
2006    }
2007    Ok(())
2008}
2009
2010fn validate_subscription_auth(
2011    runtime: &RedDBRuntime,
2012    source: &str,
2013    subscription: &crate::catalog::SubscriptionDescriptor,
2014) -> RedDBResult<()> {
2015    let auth_store = match runtime.inner.auth_store.read().clone() {
2016        Some(store) => store,
2017        None => return Ok(()),
2018    };
2019    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2020        Some(identity) => identity,
2021        None => return Ok(()),
2022    };
2023    let tenant = crate::runtime::impl_core::current_tenant();
2024    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2025
2026    if auth_store.iam_authorization_enabled() {
2027        let ctx = crate::auth::policies::EvalContext {
2028            principal_tenant: tenant.clone(),
2029            current_tenant: tenant.clone(),
2030            peer_ip: None,
2031            mfa_present: false,
2032            now_ms: crate::auth::now_ms(),
2033            principal_is_admin_role: role == crate::auth::Role::Admin,
2034        };
2035        let mut source_resource = crate::auth::policies::ResourceRef::new("table", source);
2036        if let Some(t) = tenant.as_deref() {
2037            source_resource = source_resource.with_tenant(t.to_string());
2038        }
2039        if !auth_store.check_policy_authz(&principal, "select", &source_resource, &ctx) {
2040            return Err(RedDBError::Query(format!(
2041                "permission denied: principal=`{}` action=`select` resource=`{}:{}` denied by IAM policy",
2042                principal, source_resource.kind, source_resource.name
2043            )));
2044        }
2045
2046        let mut target_resource =
2047            crate::auth::policies::ResourceRef::new("queue", subscription.target_queue.clone());
2048        if let Some(t) = tenant.as_deref() {
2049            target_resource = target_resource.with_tenant(t.to_string());
2050        }
2051        if !auth_store.check_policy_authz(&principal, "write", &target_resource, &ctx) {
2052            return Err(RedDBError::Query(format!(
2053                "permission denied: principal=`{}` action=`write` resource=`{}:{}` denied by IAM policy",
2054                principal, target_resource.kind, target_resource.name
2055            )));
2056        }
2057        return Ok(());
2058    }
2059
2060    let ctx = crate::auth::privileges::AuthzContext {
2061        principal: &username,
2062        effective_role: role,
2063        tenant: tenant.as_deref(),
2064    };
2065    auth_store
2066        .check_grant(
2067            &ctx,
2068            crate::auth::privileges::Action::Select,
2069            &crate::auth::privileges::Resource::table_from_name(source),
2070        )
2071        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2072    auth_store
2073        .check_grant(
2074            &ctx,
2075            crate::auth::privileges::Action::Insert,
2076            &crate::auth::privileges::Resource::table_from_name(&subscription.target_queue),
2077        )
2078        .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))?;
2079    Ok(())
2080}
2081
2082fn audit_subscription_redact_gap(
2083    runtime: &RedDBRuntime,
2084    source: &str,
2085    subscription: &crate::catalog::SubscriptionDescriptor,
2086) {
2087    let auth_store = match runtime.inner.auth_store.read().clone() {
2088        Some(store) if store.iam_authorization_enabled() => store,
2089        _ => return,
2090    };
2091    let (username, role) = match crate::runtime::impl_core::current_auth_identity() {
2092        Some(identity) => identity,
2093        None => return,
2094    };
2095    let tenant = crate::runtime::impl_core::current_tenant();
2096    let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
2097    let missing = subscription_redact_gap_columns(&auth_store, &principal, source, subscription);
2098    if missing.is_empty() {
2099        return;
2100    }
2101
2102    let columns = missing.into_iter().collect::<Vec<_>>().join(", ");
2103    tracing::warn!(
2104        target: "reddb::operator",
2105        "subscription_redact_gap: source={} target_queue={} columns=[{}]",
2106        source,
2107        subscription.target_queue,
2108        columns
2109    );
2110    let mut event = AuditEvent::builder("subscription_redact_gap")
2111        .principal(username)
2112        .source(AuditAuthSource::System)
2113        .resource(format!(
2114            "subscription:{}->{}",
2115            source, subscription.target_queue
2116        ))
2117        .outcome(Outcome::Success)
2118        .field(AuditFieldEscaper::field("source", source))
2119        .field(AuditFieldEscaper::field(
2120            "target_queue",
2121            subscription.target_queue.clone(),
2122        ))
2123        .field(AuditFieldEscaper::field(
2124            "subscription",
2125            subscription.name.clone(),
2126        ))
2127        .field(AuditFieldEscaper::field("columns", columns))
2128        .field(AuditFieldEscaper::field("role", role.as_str()));
2129    if let Some(t) = tenant {
2130        event = event.tenant(t);
2131    }
2132    runtime.inner.audit_log.record_event(event.build());
2133}
2134
2135fn subscription_redact_gap_columns(
2136    auth_store: &crate::auth::store::AuthStore,
2137    principal: &crate::auth::UserId,
2138    source: &str,
2139    subscription: &crate::catalog::SubscriptionDescriptor,
2140) -> BTreeSet<String> {
2141    let redacted: HashSet<String> = subscription
2142        .redact_fields
2143        .iter()
2144        .map(|field| field.to_ascii_lowercase())
2145        .collect();
2146    auth_store
2147        .effective_policies(principal)
2148        .iter()
2149        .flat_map(|policy| policy.statements.iter())
2150        .filter(|statement| statement.effect == crate::auth::policies::Effect::Deny)
2151        .filter(|statement| statement.actions.iter().any(action_pattern_matches_select))
2152        .flat_map(|statement| statement.resources.iter())
2153        .filter_map(|resource| denied_column_for_source(resource, source))
2154        .filter(|column| !redact_covers_column(&redacted, source, column))
2155        .collect()
2156}
2157
2158fn action_pattern_matches_select(pattern: &crate::auth::policies::ActionPattern) -> bool {
2159    match pattern {
2160        crate::auth::policies::ActionPattern::Wildcard => true,
2161        crate::auth::policies::ActionPattern::Exact(action) => action == "select",
2162        crate::auth::policies::ActionPattern::Prefix(prefix) => {
2163            "select".len() > prefix.len() + 1
2164                && "select".starts_with(prefix)
2165                && "select".as_bytes()[prefix.len()] == b':'
2166        }
2167    }
2168}
2169
2170fn denied_column_for_source(
2171    resource: &crate::auth::policies::ResourcePattern,
2172    source: &str,
2173) -> Option<String> {
2174    let crate::auth::policies::ResourcePattern::Exact { kind, name } = resource else {
2175        return None;
2176    };
2177    if kind != "column" {
2178        return None;
2179    }
2180    let column = crate::auth::ColumnRef::parse_resource_name(name).ok()?;
2181    (column.table_resource_name() == source).then_some(column.column)
2182}
2183
2184fn redact_covers_column(redacted: &HashSet<String>, source: &str, column: &str) -> bool {
2185    let column = column.to_ascii_lowercase();
2186    let qualified = format!("{}.{}", source.to_ascii_lowercase(), column);
2187    redacted.contains("*") || redacted.contains(&column) || redacted.contains(&qualified)
2188}
2189
2190fn subscription_would_create_cycle(
2191    db: &crate::storage::unified::devx::RedDB,
2192    source: &str,
2193    target: &str,
2194) -> bool {
2195    let mut graph: HashMap<String, Vec<String>> = HashMap::new();
2196    for contract in db.collection_contracts() {
2197        for subscription in contract
2198            .subscriptions
2199            .into_iter()
2200            .filter(|subscription| subscription.enabled)
2201        {
2202            graph
2203                .entry(subscription.source)
2204                .or_default()
2205                .push(subscription.target_queue);
2206        }
2207    }
2208    graph
2209        .entry(source.to_string())
2210        .or_default()
2211        .push(target.to_string());
2212
2213    let mut stack = vec![target.to_string()];
2214    let mut seen = HashSet::new();
2215    while let Some(node) = stack.pop() {
2216        if node == source {
2217            return true;
2218        }
2219        if !seen.insert(node.clone()) {
2220            continue;
2221        }
2222        if let Some(next) = graph.get(&node) {
2223            stack.extend(next.iter().cloned());
2224        }
2225    }
2226    false
2227}
2228
2229pub(crate) fn ensure_event_target_queue_pub(
2230    runtime: &RedDBRuntime,
2231    queue: &str,
2232) -> RedDBResult<()> {
2233    ensure_event_target_queue(runtime, queue)
2234}
2235
2236fn ensure_event_target_queue(runtime: &RedDBRuntime, queue: &str) -> RedDBResult<()> {
2237    let store = runtime.inner.db.store();
2238    if store.get_collection(queue).is_some() {
2239        return Ok(());
2240    }
2241    store
2242        .create_collection(queue)
2243        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2244    runtime
2245        .inner
2246        .db
2247        .save_collection_contract(event_queue_collection_contract(queue))
2248        .map_err(|err| RedDBError::Internal(err.to_string()))?;
2249    store.set_config_tree(
2250        &format!("queue.{queue}.mode"),
2251        &crate::serde_json::Value::String("fanout".to_string()),
2252    );
2253    Ok(())
2254}
2255
2256fn event_queue_collection_contract(queue: &str) -> crate::physical::CollectionContract {
2257    let now = current_unix_ms();
2258    crate::physical::CollectionContract {
2259        name: queue.to_string(),
2260        declared_model: crate::catalog::CollectionModel::Queue,
2261        schema_mode: crate::catalog::SchemaMode::Dynamic,
2262        origin: crate::physical::ContractOrigin::Implicit,
2263        version: 1,
2264        created_at_unix_ms: now,
2265        updated_at_unix_ms: now,
2266        default_ttl_ms: None,
2267        vector_dimension: None,
2268        vector_metric: None,
2269        context_index_fields: Vec::new(),
2270        declared_columns: Vec::new(),
2271        table_def: None,
2272        timestamps_enabled: false,
2273        context_index_enabled: false,
2274        metrics_raw_retention_ms: None,
2275        metrics_rollup_policies: Vec::new(),
2276        metrics_tenant_identity: None,
2277        metrics_namespace: None,
2278        append_only: true,
2279        subscriptions: Vec::new(),
2280        session_key: None,
2281        session_gap_ms: None,
2282        retention_duration_ms: None,
2283    }
2284}
2285
2286fn build_table_def_from_create_table(
2287    query: &CreateTableQuery,
2288) -> RedDBResult<crate::storage::schema::TableDef> {
2289    let mut table = crate::storage::schema::TableDef::new(query.name.clone());
2290    for column in &query.columns {
2291        if column.primary_key {
2292            table.primary_key.push(column.name.clone());
2293            table.constraints.push(
2294                crate::storage::schema::Constraint::new(
2295                    format!("pk_{}", column.name),
2296                    crate::storage::schema::ConstraintType::PrimaryKey,
2297                )
2298                .on_columns(vec![column.name.clone()]),
2299            );
2300        }
2301        if column.unique {
2302            table.constraints.push(
2303                crate::storage::schema::Constraint::new(
2304                    format!("uniq_{}", column.name),
2305                    crate::storage::schema::ConstraintType::Unique,
2306                )
2307                .on_columns(vec![column.name.clone()]),
2308            );
2309        }
2310        if column.not_null {
2311            table.constraints.push(
2312                crate::storage::schema::Constraint::new(
2313                    format!("not_null_{}", column.name),
2314                    crate::storage::schema::ConstraintType::NotNull,
2315                )
2316                .on_columns(vec![column.name.clone()]),
2317            );
2318        }
2319        table.columns.push(column_def_from_ddl(column)?);
2320    }
2321    // WITH timestamps = true: append the two runtime-managed columns
2322    // to the schema so resolved_contract_columns exposes them to the
2323    // normalize/validate path. Declared as UnsignedInteger (unix-ms),
2324    // not-nullable; the write path auto-fills them.
2325    if query.timestamps {
2326        table.columns.push(
2327            crate::storage::schema::ColumnDef::new(
2328                "created_at".to_string(),
2329                crate::storage::schema::DataType::UnsignedInteger,
2330            )
2331            .not_null(),
2332        );
2333        table.columns.push(
2334            crate::storage::schema::ColumnDef::new(
2335                "updated_at".to_string(),
2336                crate::storage::schema::DataType::UnsignedInteger,
2337            )
2338            .not_null(),
2339        );
2340        table.constraints.push(
2341            crate::storage::schema::Constraint::new(
2342                "not_null_created_at".to_string(),
2343                crate::storage::schema::ConstraintType::NotNull,
2344            )
2345            .on_columns(vec!["created_at".to_string()]),
2346        );
2347        table.constraints.push(
2348            crate::storage::schema::Constraint::new(
2349                "not_null_updated_at".to_string(),
2350                crate::storage::schema::ConstraintType::NotNull,
2351            )
2352            .on_columns(vec!["updated_at".to_string()]),
2353        );
2354    }
2355    table
2356        .validate()
2357        .map_err(|err| RedDBError::Query(format!("invalid table definition: {err}")))?;
2358    Ok(table)
2359}
2360
2361fn column_def_from_ddl(column: &CreateColumnDef) -> RedDBResult<crate::storage::schema::ColumnDef> {
2362    let data_type = resolve_declared_data_type(&column.data_type)
2363        .map_err(|err| RedDBError::Query(err.to_string()))?;
2364    let mut column_def = crate::storage::schema::ColumnDef::new(column.name.clone(), data_type);
2365    if column.not_null {
2366        column_def = column_def.not_null();
2367    }
2368    if let Some(default) = &column.default {
2369        column_def = column_def.with_default(default.as_bytes().to_vec());
2370    }
2371    if column.compress.unwrap_or(0) > 0 {
2372        column_def = column_def.compressed();
2373    }
2374    if !column.enum_variants.is_empty() {
2375        column_def = column_def.with_variants(column.enum_variants.clone());
2376    }
2377    if let Some(precision) = column.decimal_precision {
2378        column_def = column_def.with_precision(precision);
2379    }
2380    if let Some(element_type) = &column.array_element {
2381        column_def = column_def.with_element_type(
2382            resolve_declared_data_type(element_type)
2383                .map_err(|err| RedDBError::Query(err.to_string()))?,
2384        );
2385    }
2386    column_def = column_def.with_metadata("ddl_data_type", column.data_type.clone());
2387    if column.unique {
2388        column_def = column_def.with_metadata("unique", "true");
2389    }
2390    if column.primary_key {
2391        column_def = column_def.with_metadata("primary_key", "true");
2392    }
2393    Ok(column_def)
2394}
2395
2396fn current_unix_ms() -> u128 {
2397    std::time::SystemTime::now()
2398        .duration_since(std::time::UNIX_EPOCH)
2399        .unwrap_or_default()
2400        .as_millis()
2401}
2402
2403#[cfg(test)]
2404mod tests {
2405    use crate::auth::policies::{ActionPattern, Effect, Policy, ResourcePattern, Statement};
2406    use crate::auth::store::{AuthStore, PrincipalRef};
2407    use crate::auth::UserId;
2408    use crate::auth::{AuthConfig, Role};
2409    use crate::runtime::impl_core::{clear_current_auth_identity, set_current_auth_identity};
2410    use crate::storage::schema::Value;
2411    use crate::{RedDBOptions, RedDBRuntime};
2412    use std::sync::Arc;
2413
2414    fn make_allow_policy(id: &str, action: &str, collection: &str) -> Policy {
2415        Policy {
2416            id: id.to_string(),
2417            version: 1,
2418            tenant: None,
2419            created_at: 0,
2420            updated_at: 0,
2421            statements: vec![Statement {
2422                sid: None,
2423                effect: Effect::Allow,
2424                actions: vec![ActionPattern::Exact(action.to_string())],
2425                resources: vec![ResourcePattern::Exact {
2426                    kind: "collection".to_string(),
2427                    name: collection.to_string(),
2428                }],
2429                condition: None,
2430            }],
2431        }
2432    }
2433
2434    fn wire_auth_store(rt: &RedDBRuntime) -> Arc<AuthStore> {
2435        let store = Arc::new(AuthStore::new(AuthConfig::default()));
2436        *rt.inner.auth_store.write() = Some(store.clone());
2437        store
2438    }
2439
2440    #[test]
2441    fn drop_denied_without_iam_policy() {
2442        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2443        rt.execute_query("CREATE TABLE foo (id INT)").unwrap();
2444        let store = wire_auth_store(&rt);
2445        // Put a select-only policy so IAM mode activates, but give alice no drop policy.
2446        let select_only = Policy {
2447            id: "select-only".to_string(),
2448            version: 1,
2449            tenant: None,
2450            created_at: 0,
2451            updated_at: 0,
2452            statements: vec![Statement {
2453                sid: None,
2454                effect: Effect::Allow,
2455                actions: vec![ActionPattern::Exact("select".to_string())],
2456                resources: vec![ResourcePattern::Wildcard],
2457                condition: None,
2458            }],
2459        };
2460        store.put_policy_internal(select_only).unwrap();
2461        let alice = UserId::from_parts(None, "alice");
2462        store
2463            .attach_policy(PrincipalRef::User(alice), "select-only")
2464            .unwrap();
2465        set_current_auth_identity("alice".to_string(), Role::Write);
2466        let err = rt.execute_query("DROP TABLE foo").unwrap_err();
2467        clear_current_auth_identity();
2468        assert!(
2469            format!("{err}").contains("denied by IAM policy"),
2470            "got: {err}"
2471        );
2472    }
2473
2474    #[test]
2475    fn drop_allowed_with_explicit_iam_policy() {
2476        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2477        rt.execute_query("CREATE TABLE bar (id INT)").unwrap();
2478        let store = wire_auth_store(&rt);
2479        let policy = make_allow_policy("allow-drop-bar", "drop", "bar");
2480        store.put_policy_internal(policy).unwrap();
2481        let bob = UserId::from_parts(None, "bob");
2482        store
2483            .attach_policy(PrincipalRef::User(bob), "allow-drop-bar")
2484            .unwrap();
2485        set_current_auth_identity("bob".to_string(), Role::Write);
2486        rt.execute_query("DROP TABLE bar").unwrap();
2487        clear_current_auth_identity();
2488    }
2489
2490    #[test]
2491    fn drop_allowed_with_wildcard_iam_policy() {
2492        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2493        rt.execute_query("CREATE TABLE baz (id INT)").unwrap();
2494        let store = wire_auth_store(&rt);
2495        let policy = Policy {
2496            id: "allow-drop-all".to_string(),
2497            version: 1,
2498            tenant: None,
2499            created_at: 0,
2500            updated_at: 0,
2501            statements: vec![Statement {
2502                sid: None,
2503                effect: Effect::Allow,
2504                actions: vec![ActionPattern::Exact("drop".to_string())],
2505                resources: vec![ResourcePattern::Wildcard],
2506                condition: None,
2507            }],
2508        };
2509        store.put_policy_internal(policy).unwrap();
2510        let carl = UserId::from_parts(None, "carl");
2511        store
2512            .attach_policy(PrincipalRef::User(carl), "allow-drop-all")
2513            .unwrap();
2514        set_current_auth_identity("carl".to_string(), Role::Write);
2515        rt.execute_query("DROP TABLE baz").unwrap();
2516        clear_current_auth_identity();
2517    }
2518
2519    #[test]
2520    fn truncate_denied_without_iam_policy() {
2521        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2522        rt.execute_query("CREATE TABLE qux (id INT)").unwrap();
2523        let store = wire_auth_store(&rt);
2524        // A policy exists (IAM active) but gives no truncate right.
2525        let select_only = Policy {
2526            id: "select-only-2".to_string(),
2527            version: 1,
2528            tenant: None,
2529            created_at: 0,
2530            updated_at: 0,
2531            statements: vec![Statement {
2532                sid: None,
2533                effect: Effect::Allow,
2534                actions: vec![ActionPattern::Exact("select".to_string())],
2535                resources: vec![ResourcePattern::Wildcard],
2536                condition: None,
2537            }],
2538        };
2539        store.put_policy_internal(select_only).unwrap();
2540        let dana = UserId::from_parts(None, "dana");
2541        store
2542            .attach_policy(PrincipalRef::User(dana), "select-only-2")
2543            .unwrap();
2544        set_current_auth_identity("dana".to_string(), Role::Write);
2545        let err = rt.execute_query("TRUNCATE TABLE qux").unwrap_err();
2546        clear_current_auth_identity();
2547        assert!(
2548            format!("{err}").contains("denied by IAM policy"),
2549            "got: {err}"
2550        );
2551    }
2552
2553    #[test]
2554    fn truncate_table_clears_rows_and_preserves_schema_and_indexes() {
2555        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2556        rt.execute_query("CREATE TABLE users (id INT, name TEXT)")
2557            .unwrap();
2558        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'ana'), (2, 'bob')")
2559            .unwrap();
2560        rt.execute_query("CREATE INDEX idx_users_id ON users (id) USING HASH")
2561            .unwrap();
2562
2563        let truncated = rt.execute_query("TRUNCATE TABLE users").unwrap();
2564        assert_eq!(truncated.statement_type, "truncate");
2565        assert_eq!(truncated.affected_rows, 0);
2566
2567        let empty = rt.execute_query("SELECT id FROM users").unwrap();
2568        assert!(empty.result.records.is_empty());
2569
2570        rt.execute_query("INSERT INTO users (id, name) VALUES (3, 'cy')")
2571            .unwrap();
2572        let selected = rt
2573            .execute_query("SELECT name FROM users WHERE id = 3")
2574            .unwrap();
2575        let name = selected.result.records[0].get("name").unwrap();
2576        assert_eq!(name, &Value::text("cy"));
2577        assert!(rt.db().collection_contract("users").is_some());
2578        assert!(rt
2579            .inner
2580            .index_store
2581            .list_indices("users")
2582            .iter()
2583            .any(|index| index.name == "idx_users_id"));
2584    }
2585
2586    #[test]
2587    fn truncate_collection_is_polymorphic_and_typed_mismatch_fails() {
2588        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2589        rt.execute_query("CREATE QUEUE tasks").unwrap();
2590        rt.execute_query("QUEUE PUSH tasks {'job':'a'}").unwrap();
2591
2592        let err = rt.execute_query("TRUNCATE TABLE tasks").unwrap_err();
2593        assert!(format!("{err}").contains("model mismatch: expected table, got queue"));
2594
2595        rt.execute_query("TRUNCATE COLLECTION tasks").unwrap();
2596        let len = rt.execute_query("QUEUE LEN tasks").unwrap();
2597        assert_eq!(
2598            len.result.records[0].get("len"),
2599            Some(&Value::UnsignedInteger(0))
2600        );
2601    }
2602
2603    #[test]
2604    fn truncate_system_schema_is_read_only() {
2605        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2606        let err = rt
2607            .execute_query("TRUNCATE COLLECTION red.collections")
2608            .unwrap_err();
2609        assert!(format!("{err}").contains("system schema is read-only"));
2610    }
2611
2612    // ── #302 / #310: TRUNCATE / DROP single-event semantics ────────────────
2613
2614    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2615        let result = rt
2616            .execute_query(&format!("QUEUE PEEK {queue} 100"))
2617            .expect("peek queue");
2618        result
2619            .result
2620            .records
2621            .iter()
2622            .map(
2623                |record| match record.get("payload").expect("payload column") {
2624                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2625                    other => panic!("expected JSON queue payload, got {other:?}"),
2626                },
2627            )
2628            .collect()
2629    }
2630
2631    /// `TRUNCATE users` on an event-enabled collection emits exactly 1
2632    /// `truncate` event, not one delete event per row.
2633    #[test]
2634    fn truncate_event_enabled_table_emits_single_truncate_event() {
2635        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2636        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2637            .unwrap();
2638        rt.execute_query(
2639            "INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')",
2640        )
2641        .unwrap();
2642
2643        // Drain the 3 insert events so we start clean.
2644        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2645
2646        rt.execute_query("TRUNCATE TABLE users").unwrap();
2647
2648        let events = queue_payloads(&rt, "users_events");
2649        // Must be exactly 1 truncate event, not 3 delete events.
2650        assert_eq!(
2651            events.len(),
2652            1,
2653            "expected 1 truncate event, got {}",
2654            events.len()
2655        );
2656        let ev = events[0].as_object().expect("event is object");
2657        assert_eq!(
2658            ev.get("op").and_then(crate::json::Value::as_str),
2659            Some("truncate")
2660        );
2661        assert_eq!(
2662            ev.get("collection").and_then(crate::json::Value::as_str),
2663            Some("users")
2664        );
2665        assert_eq!(
2666            ev.get("entities_count")
2667                .and_then(crate::json::Value::as_u64),
2668            Some(3)
2669        );
2670        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2671        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2672        assert!(ev
2673            .get("event_id")
2674            .and_then(crate::json::Value::as_str)
2675            .is_some_and(|s| !s.is_empty()));
2676    }
2677
2678    /// `TRUNCATE users` on a collection without event subscription emits no events.
2679    #[test]
2680    fn truncate_no_events_collection_emits_nothing() {
2681        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2682        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2683            .unwrap();
2684        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a'), (2, 'b')")
2685            .unwrap();
2686        // No EVENTS subscription — truncate must work without touching any queue.
2687        rt.execute_query("TRUNCATE TABLE plain").unwrap();
2688        // No crash, no queue to check. Just verify truncation happened.
2689        let rows = rt.execute_query("SELECT id FROM plain").unwrap();
2690        assert!(rows.result.records.is_empty());
2691    }
2692
2693    /// `DROP TABLE users` on an event-enabled collection emits exactly 1
2694    /// `collection_dropped` event. The subscription is removed from the
2695    /// source contract but the target queue is preserved for consumer drain.
2696    #[test]
2697    fn drop_event_enabled_table_emits_single_collection_dropped_event() {
2698        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2699        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2700            .unwrap();
2701        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'alice'), (2, 'bob')")
2702            .unwrap();
2703
2704        // Drain insert events so we start clean.
2705        rt.execute_query("QUEUE POP users_events COUNT 10").unwrap();
2706
2707        rt.execute_query("DROP TABLE users").unwrap();
2708
2709        // Queue must still exist with 1 collection_dropped event.
2710        let events = queue_payloads(&rt, "users_events");
2711        assert_eq!(
2712            events.len(),
2713            1,
2714            "expected 1 collection_dropped event, got {}",
2715            events.len()
2716        );
2717        let ev = events[0].as_object().expect("event is object");
2718        assert_eq!(
2719            ev.get("op").and_then(crate::json::Value::as_str),
2720            Some("collection_dropped")
2721        );
2722        assert_eq!(
2723            ev.get("collection").and_then(crate::json::Value::as_str),
2724            Some("users")
2725        );
2726        assert_eq!(
2727            ev.get("final_entities_count")
2728                .and_then(crate::json::Value::as_u64),
2729            Some(2)
2730        );
2731        assert!(ev.get("ts").and_then(crate::json::Value::as_u64).is_some());
2732        assert!(ev.get("lsn").and_then(crate::json::Value::as_u64).is_some());
2733        assert!(ev
2734            .get("event_id")
2735            .and_then(crate::json::Value::as_str)
2736            .is_some_and(|s| !s.is_empty()));
2737
2738        // Source collection is gone.
2739        let err = rt.execute_query("SELECT id FROM users").unwrap_err();
2740        assert!(
2741            format!("{err}").contains("users"),
2742            "expected not-found error"
2743        );
2744    }
2745
2746    /// `DROP TABLE users` on a collection without event subscription works
2747    /// normally with no event emitted.
2748    #[test]
2749    fn drop_no_events_collection_emits_nothing() {
2750        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2751        rt.execute_query("CREATE TABLE plain (id INT, val TEXT)")
2752            .unwrap();
2753        rt.execute_query("INSERT INTO plain (id, val) VALUES (1, 'a')")
2754            .unwrap();
2755        rt.execute_query("DROP TABLE plain").unwrap();
2756        // No crash and collection is gone.
2757        let err = rt.execute_query("SELECT id FROM plain").unwrap_err();
2758        assert!(format!("{err}").contains("plain"));
2759    }
2760
2761    // ── #297: ops_filter + WHERE filter ────────────────────────────────────
2762
2763    /// `WITH EVENTS (INSERT)` — UPDATE and DELETE events must NOT be emitted.
2764    #[test]
2765    fn ops_filter_insert_only_ignores_update_and_delete() {
2766        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2767        rt.execute_query(
2768            "CREATE TABLE items (id INT, val TEXT) WITH EVENTS (INSERT) TO items_events",
2769        )
2770        .unwrap();
2771        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 'a')")
2772            .unwrap();
2773        rt.execute_query("UPDATE items SET val = 'b' WHERE id = 1")
2774            .unwrap();
2775        rt.execute_query("DELETE FROM items WHERE id = 1").unwrap();
2776
2777        let events = queue_payloads(&rt, "items_events");
2778        // Only the INSERT should have fired.
2779        assert_eq!(
2780            events.len(),
2781            1,
2782            "expected 1 insert event, got {}",
2783            events.len()
2784        );
2785        assert_eq!(
2786            events[0]
2787                .as_object()
2788                .unwrap()
2789                .get("op")
2790                .and_then(crate::json::Value::as_str),
2791            Some("insert")
2792        );
2793    }
2794
2795    /// `WITH EVENTS WHERE status = 'active'` — only rows matching the predicate generate events.
2796    #[test]
2797    fn where_filter_skips_rows_that_do_not_match() {
2798        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2799        rt.execute_query(
2800            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS WHERE status = 'active' TO users_events",
2801        )
2802        .unwrap();
2803
2804        // This row should generate an event.
2805        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active')")
2806            .unwrap();
2807        // This row should NOT generate an event.
2808        rt.execute_query("INSERT INTO users (id, status) VALUES (2, 'inactive')")
2809            .unwrap();
2810
2811        let events = queue_payloads(&rt, "users_events");
2812        assert_eq!(
2813            events.len(),
2814            1,
2815            "expected 1 event (only active), got {}",
2816            events.len()
2817        );
2818        let ev = events[0].as_object().unwrap();
2819        assert_eq!(
2820            ev.get("op").and_then(crate::json::Value::as_str),
2821            Some("insert")
2822        );
2823        let after = ev.get("after").unwrap().as_object().unwrap();
2824        assert_eq!(
2825            after.get("status").and_then(crate::json::Value::as_str),
2826            Some("active")
2827        );
2828    }
2829
2830    /// `WITH EVENTS (INSERT, UPDATE) WHERE status = 'active'` — combination functional.
2831    #[test]
2832    fn ops_filter_and_where_filter_combined() {
2833        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2834        rt.execute_query(
2835            "CREATE TABLE items (id INT, status TEXT) WITH EVENTS (INSERT, UPDATE) WHERE status = 'active' TO items_events",
2836        )
2837        .unwrap();
2838
2839        // INSERT active → event
2840        rt.execute_query("INSERT INTO items (id, status) VALUES (1, 'active')")
2841            .unwrap();
2842        // INSERT inactive → no event
2843        rt.execute_query("INSERT INTO items (id, status) VALUES (2, 'inactive')")
2844            .unwrap();
2845        // UPDATE row 1 to inactive → after = inactive, no event
2846        rt.execute_query("UPDATE items SET status = 'inactive' WHERE id = 1")
2847            .unwrap();
2848        // DELETE → ops_filter excludes it
2849        rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2850
2851        let events = queue_payloads(&rt, "items_events");
2852        // Only the first INSERT (active) fires; UPDATE result is inactive so skipped; DELETE excluded by ops_filter.
2853        assert_eq!(
2854            events.len(),
2855            1,
2856            "expected 1 event, got {}: {events:?}",
2857            events.len()
2858        );
2859        assert_eq!(
2860            events[0]
2861                .as_object()
2862                .unwrap()
2863                .get("op")
2864                .and_then(crate::json::Value::as_str),
2865            Some("insert")
2866        );
2867    }
2868
2869    /// WHERE filter on DELETE events — the before-state (pre-image) is evaluated.
2870    #[test]
2871    fn where_filter_on_delete_checks_before_state() {
2872        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2873        rt.execute_query(
2874            "CREATE TABLE users (id INT, status TEXT) WITH EVENTS (DELETE) WHERE status = 'active' TO users_events",
2875        )
2876        .unwrap();
2877
2878        rt.execute_query("INSERT INTO users (id, status) VALUES (1, 'active'), (2, 'inactive')")
2879            .unwrap();
2880
2881        // Delete active row → event (before-state was active)
2882        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
2883        // Delete inactive row → no event (before-state was inactive)
2884        rt.execute_query("DELETE FROM users WHERE id = 2").unwrap();
2885
2886        let events = queue_payloads(&rt, "users_events");
2887        assert_eq!(
2888            events.len(),
2889            1,
2890            "expected 1 delete event, got {}",
2891            events.len()
2892        );
2893        let ev = events[0].as_object().unwrap();
2894        assert_eq!(
2895            ev.get("op").and_then(crate::json::Value::as_str),
2896            Some("delete")
2897        );
2898    }
2899
2900    // ── #301: schema evolution OperatorEvent on ALTER ───────────────────────
2901
2902    /// ADD COLUMN on event-enabled table must succeed (OperatorEvent is best-effort).
2903    #[test]
2904    fn alter_add_column_on_event_enabled_table_succeeds() {
2905        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2906        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO users_events")
2907            .unwrap();
2908        // Must not error — OperatorEvent emission is best-effort (no global sink in tests).
2909        rt.execute_query("ALTER TABLE users ADD COLUMN phone TEXT")
2910            .unwrap();
2911        // The column is now in the contract.
2912        let contract = rt.db().collection_contract("users").unwrap();
2913        assert!(
2914            contract.declared_columns.iter().any(|c| c.name == "phone"),
2915            "phone column should be in contract"
2916        );
2917        // Subscription still enabled after the alter.
2918        assert!(
2919            contract.subscriptions.iter().any(|s| s.enabled),
2920            "subscription should remain enabled"
2921        );
2922    }
2923
2924    /// DROP COLUMN on event-enabled table must succeed; non-column ALTERs
2925    /// (like ENABLE ROW LEVEL SECURITY) must also succeed without emitting.
2926    #[test]
2927    fn alter_drop_column_and_rls_on_event_enabled_table_succeeds() {
2928        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2929        rt.execute_query(
2930            "CREATE TABLE items (id INT, secret TEXT, status TEXT) WITH EVENTS TO items_events",
2931        )
2932        .unwrap();
2933        // DROP COLUMN — schema change event path exercises, must not error.
2934        rt.execute_query("ALTER TABLE items DROP COLUMN secret")
2935            .unwrap();
2936        let contract = rt.db().collection_contract("items").unwrap();
2937        assert!(
2938            !contract.declared_columns.iter().any(|c| c.name == "secret"),
2939            "secret column should be removed"
2940        );
2941        // ENABLE RLS — non-column op, no schema-change event (coverage).
2942        rt.execute_query("ALTER TABLE items ENABLE ROW LEVEL SECURITY")
2943            .unwrap();
2944        // Collection and subscription still intact.
2945        assert!(
2946            contract.subscriptions.iter().any(|s| s.enabled),
2947            "subscription should remain enabled"
2948        );
2949    }
2950}