Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Issue #765 / S6 — lazily hydrate the integrity-tombstone cache from
148    /// `red_config` on first access. Returns `true` when at least one
149    /// tombstone range is present. Subsequent calls observe the cached state
150    /// flag (`1` empty / `2` present) and skip the store scan.
151    fn ensure_integrity_tombstones_loaded(&self) -> bool {
152        use std::sync::atomic::Ordering;
153        match self
154            .inner
155            .integrity_tombstones_state
156            .load(Ordering::Relaxed)
157        {
158            1 => return false,
159            2 => return true,
160            _ => {}
161        }
162        // Cold: load under the cache lock so a concurrent reader cannot
163        // observe a half-populated vector.
164        let mut guard = self.inner.integrity_tombstones.lock();
165        if self
166            .inner
167            .integrity_tombstones_state
168            .load(Ordering::Relaxed)
169            == 0
170        {
171            let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172            let present = !ranges.is_empty();
173            *guard = ranges;
174            self.inner
175                .integrity_tombstones_state
176                .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177        }
178        self.inner
179            .integrity_tombstones_state
180            .load(Ordering::Relaxed)
181            == 2
182    }
183
184    /// Issue #765 / S6 — durably record an integrity tombstone over the
185    /// inclusive RID range `[lo, hi]` of `table` (the committed rows of an
186    /// input stream whose end-to-end SHA-256 digest did not match). The range
187    /// is persisted to `red_config` (survives restart) and folded into the
188    /// in-memory cache so the same process filters it immediately.
189    pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190        use std::sync::atomic::Ordering;
191        self.ensure_integrity_tombstones_loaded();
192        let mut guard = self.inner.integrity_tombstones.lock();
193        guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194            table.to_string(),
195            lo,
196            hi,
197        ));
198        crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199        self.inner
200            .integrity_tombstones_state
201            .store(2, Ordering::Relaxed);
202    }
203
204    /// Issue #765 / S6 — snapshot of the currently-cached tombstone ranges.
205    /// Intended for tests and forensic surfaces; the read path uses
206    /// [`Self::filter_integrity_tombstoned`] which avoids the clone.
207    pub fn integrity_tombstone_ranges(
208        &self,
209    ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210        self.ensure_integrity_tombstones_loaded();
211        self.inner.integrity_tombstones.lock().clone()
212    }
213
214    /// Issue #765 / S6 — drop tombstoned rows from a SELECT result in place.
215    /// Fast no-op (one relaxed atomic load) when no tombstone has ever been
216    /// recorded. Clears `pre_serialized_json` when any row is removed so the
217    /// fast-path JSON cannot leak a filtered row back onto the wire.
218    pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219        if !self.ensure_integrity_tombstones_loaded() {
220            return;
221        }
222        let guard = self.inner.integrity_tombstones.lock();
223        if guard.is_empty() {
224            return;
225        }
226        let before = result.records.len();
227        result.records.retain(|record| {
228            !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229        });
230        if result.records.len() != before {
231            result.pre_serialized_json = None;
232        }
233    }
234
235    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
236    /// target table is tenant-scoped and the user's column list does
237    /// not already name the tenant column.
238    ///
239    /// Returns:
240    /// * `Ok(None)` — no injection needed (non-tenant table, or user
241    ///   supplied the column explicitly). Caller uses the original
242    ///   query unchanged.
243    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
244    ///   + literal value appended to every row.
245    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
246    ///   the current session. Fails loudly so callers don't produce
247    ///   rows that RLS would then hide on read.
248    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249        let Some(tenant_col) = self.tenant_column(&query.table) else {
250            return Ok(None);
251        };
252        // User already named the column (literal match) — trust them.
253        if query
254            .columns
255            .iter()
256            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257        {
258            return Ok(None);
259        }
260
261        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
262        // nested key like `headers.tenant` we operate on the root
263        // column (`headers`) and set / add the nested path inside its
264        // JSON value. If the user named the root column we mutate in
265        // place; otherwise we create a fresh JSON column for every row.
266        if let Some(dot_pos) = tenant_col.find('.') {
267            let (root, tail) = tenant_col.split_at(dot_pos);
268            let tail = &tail[1..]; // drop leading '.'
269            return self.inject_dotted_tenant(query, root, tail);
270        }
271
272        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273            return Err(RedDBError::Query(format!(
274                "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                 run SET TENANT '<id>' first or name column '{}' explicitly",
276                query.table, tenant_col
277            )));
278        };
279
280        let mut augmented = query.clone();
281        augmented.columns.push(tenant_col);
282        let lit = Value::text(tenant_id.clone());
283        for row in augmented.values.iter_mut() {
284            row.push(lit.clone());
285        }
286        for row in augmented.value_exprs.iter_mut() {
287            row.push(crate::storage::query::ast::Expr::Literal {
288                value: lit.clone(),
289                span: crate::storage::query::ast::Span::synthetic(),
290            });
291        }
292        Ok(Some(augmented))
293    }
294
295    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
296    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
297    /// nested JSON instead of appending a flat column.
298    ///
299    /// Cases:
300    /// * Root column already in the INSERT list → mutate per-row JSON
301    ///   (parse, set path, re-serialize).
302    /// * Root column absent → create a fresh `{tail: tenant}` JSON
303    ///   object and append the root column to the INSERT.
304    fn inject_dotted_tenant(
305        &self,
306        query: &InsertQuery,
307        root: &str,
308        tail: &str,
309    ) -> RedDBResult<Option<InsertQuery>> {
310        let active_tenant = crate::runtime::impl_core::current_tenant();
311        let mut augmented = query.clone();
312        let root_idx = augmented
313            .columns
314            .iter()
315            .position(|c| c.eq_ignore_ascii_case(root));
316
317        if let Some(idx) = root_idx {
318            // User supplied the root column. Per-row: if the dotted
319            // tail is already present we trust the user (admin / bulk
320            // loader scenario); otherwise fill from the active
321            // tenant. An unbound tenant is only an error when some
322            // row actually needs filling.
323            for row in augmented.values.iter_mut() {
324                let Some(slot) = row.get_mut(idx) else {
325                    continue;
326                };
327                if dotted_tail_already_set(slot, tail) {
328                    continue;
329                }
330                let Some(tenant_id) = &active_tenant else {
331                    return Err(RedDBError::Query(format!(
332                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
333                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334                        query.table, root, tail
335                    )));
336                };
337                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338            }
339            // Expression row is kept in sync by re-wrapping the
340            // mutated literal; the canonical path will re-evaluate
341            // against the same JSON shape.
342            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343                if let Some(slot) = row.get_mut(idx) {
344                    let new_value = augmented
345                        .values
346                        .get(row_idx)
347                        .and_then(|v| v.get(idx))
348                        .cloned()
349                        .unwrap_or(Value::Null);
350                    *slot = crate::storage::query::ast::Expr::Literal {
351                        value: new_value,
352                        span: crate::storage::query::ast::Span::synthetic(),
353                    };
354                }
355            }
356        } else {
357            // No root column in the INSERT list — auto-fill needs a
358            // bound tenant to synthesise one. Error loud so we never
359            // create a tenant-less row that RLS would then hide.
360            let Some(tenant_id) = &active_tenant else {
361                return Err(RedDBError::Query(format!(
362                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
363                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364                    query.table, root, tail
365                )));
366            };
367            // Create a fresh JSON column with only the tenant path set.
368            augmented.columns.push(root.to_string());
369            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370            for row in augmented.values.iter_mut() {
371                row.push(fresh.clone());
372            }
373            for row in augmented.value_exprs.iter_mut() {
374                row.push(crate::storage::query::ast::Expr::Literal {
375                    value: fresh.clone(),
376                    span: crate::storage::query::ast::Span::synthetic(),
377                });
378            }
379        }
380
381        Ok(Some(augmented))
382    }
383
384    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
385    /// `lsns` is empty because events fire at commit time.
386    fn delete_entities_batch(
387        &self,
388        collection: &str,
389        ids: &[EntityId],
390    ) -> RedDBResult<(u64, Vec<u64>)> {
391        if ids.is_empty() {
392            return Ok((0, vec![]));
393        }
394
395        let store = self.db().store();
396        let Some(manager) = store.get_collection(collection) else {
397            return Ok((0, vec![]));
398        };
399
400        let active_xid = self.current_xid();
401        let conn_id = crate::runtime::impl_core::current_connection_id();
402        let mut autocommit_xid = None;
403        let mut tombstoned_ids = Vec::new();
404        let mut tombstoned_entities = Vec::new();
405        let mut physical_delete_ids = Vec::new();
406        let table_row_resolver =
407            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408
409        for &id in ids {
410            let Some(mut entity) = manager.get(id) else {
411                continue;
412            };
413            if matches!(entity.data, EntityData::Row(_)) {
414                let previous_xmax = entity.xmax;
415                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
416                    if table_row_resolver.resolve_candidate(&entity).is_none() {
417                        continue;
418                    }
419                } else if entity.xmax != 0 {
420                    continue;
421                }
422
423                let xid = match active_xid {
424                    Some(xid) => xid,
425                    None => match autocommit_xid {
426                        Some(xid) => xid,
427                        None => {
428                            let mgr = self.snapshot_manager();
429                            let xid = mgr.begin();
430                            autocommit_xid = Some(xid);
431                            xid
432                        }
433                    },
434                };
435                entity.set_xmax(xid);
436                if manager.update(entity.clone()).is_ok() {
437                    if active_xid.is_some() {
438                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
439                    }
440                    tombstoned_entities.push(entity);
441                    tombstoned_ids.push(id);
442                }
443            } else {
444                physical_delete_ids.push(id);
445            }
446        }
447
448        if let Some(xid) = autocommit_xid {
449            self.snapshot_manager().commit(xid);
450        }
451
452        let mut affected = tombstoned_ids.len() as u64;
453        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
454        if active_xid.is_some() {
455            store
456                .persist_entities_to_pager(collection, &tombstoned_entities)
457                .map_err(|err| RedDBError::Internal(err.to_string()))?;
458        } else {
459            store
460                .persist_entities_to_pager(collection, &tombstoned_entities)
461                .map_err(|err| RedDBError::Internal(err.to_string()))?;
462            for id in &tombstoned_ids {
463                store.context_index().remove_entity(*id);
464                let lsn = self.cdc_emit(
465                    crate::replication::cdc::ChangeOperation::Delete,
466                    collection,
467                    id.raw(),
468                    "entity",
469                );
470                lsns.push(lsn);
471            }
472        }
473
474        let deleted_ids = store
475            .delete_batch(collection, &physical_delete_ids)
476            .map_err(|err| RedDBError::Internal(err.to_string()))?;
477        affected += deleted_ids.len() as u64;
478        for id in &deleted_ids {
479            store.context_index().remove_entity(*id);
480            let lsn = self.cdc_emit(
481                crate::replication::cdc::ChangeOperation::Delete,
482                collection,
483                id.raw(),
484                "entity",
485            );
486            lsns.push(lsn);
487        }
488
489        Ok((affected, lsns))
490    }
491
492    /// Flushes context-index updates and CDC for each applied mutation.
493    /// Returns one LSN per entity in the same order as `applied`.
494    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
495        if applied.is_empty() {
496            return Ok(Vec::new());
497        }
498
499        let store = self.db().store();
500        if applied.iter().any(|item| item.context_index_dirty) {
501            store.context_index().index_entities(
502                &applied[0].collection,
503                applied
504                    .iter()
505                    .filter(|item| item.context_index_dirty)
506                    .map(|item| &item.entity),
507            );
508        }
509
510        for item in applied {
511            self.refresh_update_secondary_indexes(item)?;
512        }
513
514        let mut lsns = Vec::with_capacity(applied.len());
515        for item in applied {
516            let lsn = self.cdc_emit_prebuilt(
517                crate::replication::cdc::ChangeOperation::Update,
518                &item.collection,
519                &item.entity,
520                update_cdc_item_kind(self, &item.collection, &item.entity),
521                item.metadata.as_ref(),
522                false,
523            );
524            lsns.push(lsn);
525        }
526        Ok(lsns)
527    }
528
529    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
530        self.persist_applied_entity_mutations(applied)
531    }
532
533    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
534        if applied.pre_mutation_fields.is_empty() {
535            return Ok(());
536        }
537        let post = entity_row_fields_snapshot(&applied.entity);
538        if post.is_empty() {
539            return Ok(());
540        }
541
542        let indexed_cols = self
543            .index_store_ref()
544            .indexed_columns_set(&applied.collection);
545        if indexed_cols.is_empty() {
546            return Ok(());
547        }
548
549        if let Some(old_version) = applied.replaced_entity.as_ref() {
550            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
551                .pre_mutation_fields
552                .iter()
553                .filter(|(col, _)| indexed_cols.contains(col))
554                .cloned()
555                .collect();
556            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
557                .iter()
558                .filter(|(col, _)| indexed_cols.contains(col))
559                .cloned()
560                .collect();
561            if !old_index_fields.is_empty() {
562                self.index_store_ref()
563                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
564                    .map_err(crate::RedDBError::Internal)?;
565            }
566            if !new_index_fields.is_empty() {
567                self.index_store_ref()
568                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
569                    .map_err(crate::RedDBError::Internal)?;
570            }
571            return Ok(());
572        }
573
574        let damage =
575            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
576        if damage
577            .touched_columns()
578            .into_iter()
579            .any(|col| indexed_cols.contains(col))
580        {
581            self.index_store_ref()
582                .index_entity_update(
583                    &applied.collection,
584                    applied.id,
585                    &applied.pre_mutation_fields,
586                    &post,
587                )
588                .map_err(crate::RedDBError::Internal)?;
589        }
590        Ok(())
591    }
592
593    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
594    ///
595    /// Each row in `query.values` is zipped with `query.columns` to produce a
596    /// set of named fields, which is then dispatched based on entity_type.
597    pub fn execute_insert(
598        &self,
599        raw_query: &str,
600        query: &InsertQuery,
601    ) -> RedDBResult<RuntimeQueryResult> {
602        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
603        // CollectionContract gate (#49): single entry point for the
604        // operator's collection-level write rules. Today this is a
605        // no-op for INSERT (APPEND ONLY permits insert); routing
606        // through the gate now means future contract bits — versioned,
607        // vault-only writes — plug in once instead of per verb.
608        crate::runtime::collection_contract::CollectionContractGate::check(
609            self,
610            &query.table,
611            crate::runtime::collection_contract::MutationKind::Insert,
612        )?;
613        // Phase 2.5.4 table-scoped tenancy: if the target table is
614        // tenant-scoped and the user didn't name the tenant column,
615        // auto-inject it with the thread-local `CURRENT_TENANT()`
616        // value. When the column is named explicitly we trust the
617        // caller (useful for admin tooling that writes on behalf of
618        // specific tenants). An unbound tenant on an implicit-fill
619        // path errors up front rather than producing a row the RLS
620        // policy would silently hide.
621        let augmented_owned;
622        let query = match self.maybe_inject_tenant_column(query)? {
623            Some(new_q) => {
624                augmented_owned = new_q;
625                &augmented_owned
626            }
627            None => query,
628        };
629        self.check_insert_column_policy(query)?;
630        if let Some(ref embed_config) = query.auto_embed {
631            let provider = crate::ai::parse_provider(&embed_config.provider)?;
632            // S3 / #711: planner-level provider gate. Runs before the
633            // local-model preflight and the API-key resolver so neither
634            // side-effect fires when policy denies.
635            crate::runtime::ai::provider_gate::enforce(self, &provider)?;
636            if matches!(provider, crate::ai::AiProvider::Local) {
637                // Issue #682 — pre-flight the local model registry before
638                // any row write. Missing model, uninstalled artifacts,
639                // wrong task, and disabled-feature failures surface as
640                // deterministic errors that leave the target collection
641                // untouched, satisfying the "no partial writes on
642                // embedding failure" criterion for the failure modes
643                // owned by the local provider.
644                let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
645                if model_name.is_empty() {
646                    return Err(RedDBError::Query(
647                        "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
648                         the local provider does not have an implicit default model"
649                            .to_string(),
650                    ));
651                }
652                crate::runtime::ai::local_embedding::preflight_local_embedding(
653                    &self.inner.db,
654                    model_name,
655                )?;
656            }
657        }
658
659        let mut inserted_count: u64 = 0;
660        let effective_rows =
661            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
662
663        // Ensure the collection exists (auto-create on first insert).
664        let store = self.inner.db.store();
665        let _ = store.get_or_create_collection(&query.table);
666        let declared_model = self
667            .db()
668            .collection_contract_arc(&query.table)
669            .map(|contract| contract.declared_model);
670
671        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
672            if query.returning.is_some() {
673                Some(Vec::with_capacity(effective_rows.len()))
674            } else {
675                None
676            };
677        let mut returning_result: Option<UnifiedResult> = None;
678
679        if matches!(query.entity_type, InsertEntityType::Row)
680            && !matches!(
681                declared_model,
682                Some(crate::catalog::CollectionModel::TimeSeries)
683            )
684        {
685            // Issue #523 + #524: blockchain collections seal each row into the
686            // chain. When the caller omits the reserved columns, the engine
687            // auto-fills (#523). When the caller supplies any reserved column,
688            // the values are validated against the current tip and a mismatch
689            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
690            //
691            // The whole batch runs under a per-collection chain lock so two
692            // concurrent submitters can't both bind to the same prev_hash —
693            // the loser observes the advanced tip and gets 409 with the new
694            // tip so it can retry.
695            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
696            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
697                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
698            } else {
699                None
700            };
701            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
702
703            // Issue #525 — refuse new blocks if the chain has been marked
704            // `integrity = broken` until an admin clears the flag.
705            if chain_mode && self.is_chain_integrity_broken(&query.table) {
706                return Err(RedDBError::InvalidOperation(format!(
707                    "ChainIntegrityBroken: collection '{}' is locked until \
708                     POST /collections/{}/clear-integrity-flag is called by an admin",
709                    query.table, query.table
710                )));
711            }
712
713            // Pull the tip from the in-memory cache; fall back to a one-time
714            // scan if the cache hasn't seen this collection yet (cold start
715            // after restart). Cache is updated below as rows are sealed.
716            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
717                if chain_mode {
718                    let mut cache = self.inner.chain_tip_cache.lock();
719                    if let Some(existing) = cache.get(&query.table) {
720                        Some(existing.clone())
721                    } else if let Some(scanned) =
722                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
723                    {
724                        cache.insert(query.table.clone(), scanned.clone());
725                        Some(scanned)
726                    } else {
727                        None
728                    }
729                } else {
730                    None
731                };
732
733            let mut rows = Vec::with_capacity(effective_rows.len());
734            for row_values in &effective_rows {
735                if row_values.len() != query.columns.len() {
736                    return Err(RedDBError::Query(format!(
737                        "INSERT column count ({}) does not match value count ({})",
738                        query.columns.len(),
739                        row_values.len()
740                    )));
741                }
742                let (mut fields, mut metadata) =
743                    split_insert_metadata(self, &query.columns, row_values)?;
744                if chain_mode {
745                    use crate::runtime::blockchain_kind::{
746                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
747                        COL_TIMESTAMP, RESERVED_COLUMNS,
748                    };
749                    let supplied_height = fields
750                        .iter()
751                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
752                        .map(|(_, v)| v.clone());
753                    let supplied_prev = fields
754                        .iter()
755                        .find(|(k, _)| k == COL_PREV_HASH)
756                        .map(|(_, v)| v.clone());
757                    let supplied_ts = fields
758                        .iter()
759                        .find(|(k, _)| k == COL_TIMESTAMP)
760                        .map(|(_, v)| v.clone());
761                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
762                    let user_supplied_any = supplied_height.is_some()
763                        || supplied_prev.is_some()
764                        || supplied_ts.is_some()
765                        || supplied_hash;
766
767                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
768                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
769
770                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
771                        Some(t) => (t.hash, t.height + 1),
772                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
773                    };
774                    let server_now = crate::runtime::blockchain_kind::now_ms();
775
776                    let (use_prev, use_height, use_ts) = if user_supplied_any {
777                        // Caller is participating in the chain protocol —
778                        // every field must be supplied AND match the tip.
779                        if supplied_hash {
780                            return Err(chain_conflict_error(
781                                tip_next_height.saturating_sub(1),
782                                tip_prev_hash,
783                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
784                                server_now,
785                                "hash column is engine-computed and cannot be supplied",
786                            ));
787                        }
788                        let caller_prev = match &supplied_prev {
789                            Some(Value::Blob(b)) if b.len() == 32 => {
790                                let mut a = [0u8; 32];
791                                a.copy_from_slice(b);
792                                a
793                            }
794                            Some(Value::Text(s)) if s.len() == 64 => {
795                                // Accept hex-encoded prev_hash so JSON / SQL
796                                // callers without literal-blob syntax can
797                                // still participate in the chain protocol.
798                                let mut a = [0u8; 32];
799                                let mut ok = true;
800                                for (i, slot) in a.iter_mut().enumerate() {
801                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
802                                    match u8::from_str_radix(pair, 16) {
803                                        Ok(byte) => *slot = byte,
804                                        Err(_) => {
805                                            ok = false;
806                                            break;
807                                        }
808                                    }
809                                }
810                                if !ok {
811                                    return Err(chain_conflict_error(
812                                        tip_next_height.saturating_sub(1),
813                                        tip_prev_hash,
814                                        chain_tip_full
815                                            .as_ref()
816                                            .map(|t| t.timestamp_ms)
817                                            .unwrap_or(0),
818                                        server_now,
819                                        "prev_hash is not valid hex",
820                                    ));
821                                }
822                                a
823                            }
824                            _ => {
825                                return Err(chain_conflict_error(
826                                    tip_next_height.saturating_sub(1),
827                                    tip_prev_hash,
828                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
829                                    server_now,
830                                    "prev_hash missing or not a 32-byte Blob",
831                                ));
832                            }
833                        };
834                        if caller_prev != tip_prev_hash {
835                            return Err(chain_conflict_error(
836                                tip_next_height.saturating_sub(1),
837                                tip_prev_hash,
838                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
839                                server_now,
840                                "prev_hash does not match current tip",
841                            ));
842                        }
843                        let caller_height = match &supplied_height {
844                            Some(Value::UnsignedInteger(v)) => *v,
845                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
846                            _ => {
847                                return Err(chain_conflict_error(
848                                    tip_next_height.saturating_sub(1),
849                                    tip_prev_hash,
850                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
851                                    server_now,
852                                    "block_height missing or not an unsigned integer",
853                                ));
854                            }
855                        };
856                        if caller_height != tip_next_height {
857                            return Err(chain_conflict_error(
858                                tip_next_height.saturating_sub(1),
859                                tip_prev_hash,
860                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
861                                server_now,
862                                "block_height does not match tip+1",
863                            ));
864                        }
865                        let caller_ts = match &supplied_ts {
866                            Some(Value::UnsignedInteger(v)) => *v,
867                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
868                            _ => {
869                                return Err(chain_conflict_error(
870                                    tip_next_height.saturating_sub(1),
871                                    tip_prev_hash,
872                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
873                                    server_now,
874                                    "timestamp missing or not an unsigned integer",
875                                ));
876                            }
877                        };
878                        let drift = (caller_ts as i128) - (server_now as i128);
879                        if drift.abs() > 60_000 {
880                            return Err(chain_conflict_error(
881                                tip_next_height.saturating_sub(1),
882                                tip_prev_hash,
883                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
884                                server_now,
885                                "timestamp outside ±60s of server_time",
886                            ));
887                        }
888                        (caller_prev, caller_height, caller_ts)
889                    } else {
890                        (tip_prev_hash, tip_next_height, server_now)
891                    };
892
893                    let (reserved, new_hash) =
894                        crate::runtime::blockchain_kind::make_block_reserved_fields(
895                            use_prev, use_height, use_ts, &payload,
896                        );
897                    fields.extend(reserved);
898                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
899                        height: use_height,
900                        hash: new_hash,
901                        timestamp_ms: use_ts,
902                    });
903                }
904                // Issue #522 — signed-writes verification. On collections
905                // created with `SIGNED_BY (...)` the row must carry valid
906                // `signer_pubkey` + `signature` reserved columns. Runs
907                // after chain_mode so canonical payload covers user-supplied
908                // fields only (blockchain reserved columns are filtered by
909                // `canonical_payload`; the two signed-writes reserved
910                // columns are split out before payload computation, then
911                // re-attached for storage). The blockchain + SIGNED_BY
912                // composition is owned by issue #526; we keep #522 to the
913                // non-chain path and let chain_mode collections punt to that
914                // slice rather than half-wire it here.
915                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
916                    let (pk_col, sig_col, residual) =
917                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
918                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
919                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
920                    crate::runtime::signed_writes_kind::verify_row(
921                        &reg,
922                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
923                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
924                        &payload,
925                    )
926                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
927                    fields = residual;
928                    // Round-trip the reserved columns with the value
929                    // type the caller supplied (Text/hex on the SQL path,
930                    // Blob on the binary path). Keeps SELECT and WHERE
931                    // predicates symmetric with the INSERT shape.
932                    if let Some(col) = pk_col {
933                        fields.push((
934                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
935                            col.raw_value,
936                        ));
937                    }
938                    if let Some(col) = sig_col {
939                        fields.push((
940                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
941                            col.raw_value,
942                        ));
943                    }
944                }
945                merge_with_clauses(
946                    &mut metadata,
947                    query.ttl_ms,
948                    query.expires_at_ms,
949                    &query.with_metadata,
950                );
951                if let Some(snaps) = returning_snapshots.as_mut() {
952                    snaps.push(fields.clone());
953                }
954                rows.push(CreateRowInput {
955                    collection: query.table.clone(),
956                    fields,
957                    metadata,
958                    node_links: Vec::new(),
959                    vector_links: Vec::new(),
960                });
961            }
962            let outputs = self.create_rows_batch(CreateRowsBatchInput {
963                collection: query.table.clone(),
964                rows,
965                suppress_events: query.suppress_events,
966            })?;
967            inserted_count = outputs.len() as u64;
968
969            // Chain mode: commit the new tip to the in-memory cache only after
970            // the batch persisted successfully. If the batch threw mid-way the
971            // cache stays on the previous tip and the chain lock releases.
972            if chain_mode {
973                if let Some(new_tip) = chain_tip_full.as_ref() {
974                    self.inner
975                        .chain_tip_cache
976                        .lock()
977                        .insert(query.table.clone(), new_tip.clone());
978                }
979            }
980
981            // Hypertable chunk routing: if this table was declared via
982            // CREATE HYPERTABLE, register each row's time-column value
983            // with the registry so chunk metadata (bounds, row counts,
984            // TTL eligibility) stays current. This is what lets
985            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
986            // retention daemon sweep expired chunks without scanning
987            // every row.
988            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
989                let time_col = &spec.time_column;
990                // Find the column's index in the INSERT column list.
991                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
992                    for row in &effective_rows {
993                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
994                            if *n >= 0 {
995                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
996                            }
997                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
998                            let _ = self.inner.db.hypertables().route(&query.table, *n);
999                        }
1000                    }
1001                }
1002            }
1003
1004            if let (Some(items), Some(snaps)) =
1005                (query.returning.as_ref(), returning_snapshots.take())
1006            {
1007                let snaps = row_insert_returning_snapshots(&outputs, snaps);
1008                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1009            }
1010        } else {
1011            // Issue #419: surface the inserted entity id on every INSERT path.
1012            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
1013            // so a RETURNING clause (and the unconditional inserted_ids list,
1014            // below) can expose the engine-assigned id. TimeSeries (the row
1015            // branch in this else) still returns the not-supported error
1016            // because create_timeseries_point isn't plumbed through this fn.
1017            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1018                Vec::with_capacity(effective_rows.len());
1019            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1020            {
1021                Vec::with_capacity(effective_rows.len())
1022            } else {
1023                Vec::new()
1024            };
1025            if matches!(
1026                query.entity_type,
1027                InsertEntityType::Node | InsertEntityType::Edge
1028            ) {
1029                enum PreparedGraphInsert {
1030                    Node {
1031                        fields: Vec<(String, Value)>,
1032                        input: CreateNodeInput,
1033                    },
1034                    Edge {
1035                        fields: Vec<(String, Value)>,
1036                        input: CreateEdgeInput,
1037                    },
1038                }
1039
1040                let mut prepared = Vec::with_capacity(effective_rows.len());
1041                for row_values in &effective_rows {
1042                    if row_values.len() != query.columns.len() {
1043                        return Err(RedDBError::Query(format!(
1044                            "INSERT column count ({}) does not match value count ({})",
1045                            query.columns.len(),
1046                            row_values.len()
1047                        )));
1048                    }
1049
1050                    match query.entity_type {
1051                        InsertEntityType::Node => {
1052                            let (node_values, mut metadata) =
1053                                split_insert_metadata(self, &query.columns, row_values)?;
1054                            merge_with_clauses(
1055                                &mut metadata,
1056                                query.ttl_ms,
1057                                query.expires_at_ms,
1058                                &query.with_metadata,
1059                            );
1060                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1061                            apply_collection_default_ttl_metadata(
1062                                self,
1063                                &query.table,
1064                                &mut metadata,
1065                            );
1066                            let (columns, values) = pairwise_columns_values(&node_values);
1067                            let label = find_column_value_string(&columns, &values, "label")?;
1068                            let node_type =
1069                                find_column_value_opt_string(&columns, &values, "node_type");
1070                            let properties = extract_remaining_properties(
1071                                &columns,
1072                                &values,
1073                                &["label", "node_type"],
1074                            );
1075                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1076                                properties.iter().map(|(key, _)| key.as_str()),
1077                                &format!("node '{}'", query.table),
1078                            )?;
1079                            prepared.push(PreparedGraphInsert::Node {
1080                                fields: node_values,
1081                                input: CreateNodeInput {
1082                                    collection: query.table.clone(),
1083                                    label,
1084                                    node_type,
1085                                    properties,
1086                                    metadata,
1087                                    embeddings: Vec::new(),
1088                                    table_links: Vec::new(),
1089                                    node_links: Vec::new(),
1090                                },
1091                            });
1092                        }
1093                        InsertEntityType::Edge => {
1094                            let (edge_values, mut metadata) =
1095                                split_insert_metadata(self, &query.columns, row_values)?;
1096                            merge_with_clauses(
1097                                &mut metadata,
1098                                query.ttl_ms,
1099                                query.expires_at_ms,
1100                                &query.with_metadata,
1101                            );
1102                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1103                            apply_collection_default_ttl_metadata(
1104                                self,
1105                                &query.table,
1106                                &mut metadata,
1107                            );
1108                            let (columns, values) = pairwise_columns_values(&edge_values);
1109                            let label = find_column_value_string(&columns, &values, "label")?;
1110                            ensure_non_tree_structural_edge_label(&label)?;
1111                            let from_id = resolve_edge_endpoint_any(
1112                                self.inner.db.store().as_ref(),
1113                                &query.table,
1114                                &columns,
1115                                &values,
1116                                &["from_rid", "from"],
1117                            )?;
1118                            let to_id = resolve_edge_endpoint_any(
1119                                self.inner.db.store().as_ref(),
1120                                &query.table,
1121                                &columns,
1122                                &values,
1123                                &["to_rid", "to"],
1124                            )?;
1125                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1126                            let properties = extract_remaining_properties(
1127                                &columns,
1128                                &values,
1129                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1130                            );
1131                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1132                                properties.iter().map(|(key, _)| key.as_str()),
1133                                &format!("edge '{}'", query.table),
1134                            )?;
1135                            prepared.push(PreparedGraphInsert::Edge {
1136                                fields: edge_values,
1137                                input: CreateEdgeInput {
1138                                    collection: query.table.clone(),
1139                                    label,
1140                                    from: EntityId::new(from_id),
1141                                    to: EntityId::new(to_id),
1142                                    weight,
1143                                    properties,
1144                                    metadata,
1145                                },
1146                            });
1147                        }
1148                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1149                    }
1150                }
1151
1152                ensure_graph_insert_contract(self, &query.table)?;
1153                let mut batch = self.inner.db.batch();
1154                for item in prepared {
1155                    match item {
1156                        PreparedGraphInsert::Node { fields, input } => {
1157                            if query.returning.is_some() {
1158                                returning_field_snaps.push(fields);
1159                            }
1160                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1161                            batch = batch.add_node_with_type(
1162                                input.collection,
1163                                input.label,
1164                                node_type,
1165                                input.properties.into_iter().collect(),
1166                                input.metadata.into_iter().collect(),
1167                            );
1168                        }
1169                        PreparedGraphInsert::Edge { fields, input } => {
1170                            if query.returning.is_some() {
1171                                returning_field_snaps.push(fields);
1172                            }
1173                            batch = batch.add_edge(
1174                                input.collection,
1175                                input.label,
1176                                input.from,
1177                                input.to,
1178                                input.weight.unwrap_or(1.0),
1179                                input.properties.into_iter().collect(),
1180                                input.metadata.into_iter().collect(),
1181                            );
1182                        }
1183                    }
1184                }
1185                let batch_result = batch
1186                    .execute()
1187                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1188                let (ids, entity_kind) = match query.entity_type {
1189                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1190                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1191                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1192                };
1193                for id in &ids {
1194                    self.stamp_xmin_if_in_txn(&query.table, *id);
1195                }
1196                if query.returning.is_some() {
1197                    returning_field_snaps = graph_insert_returning_snapshots(
1198                        self.inner.db.store().as_ref(),
1199                        &query.table,
1200                        &ids,
1201                    );
1202                }
1203                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1204                let store = self.inner.db.store();
1205                entity_outputs.extend(ids.iter().map(|id| {
1206                    crate::application::entity::CreateEntityOutput {
1207                        id: *id,
1208                        entity: store.get(&query.table, *id),
1209                    }
1210                }));
1211                inserted_count = ids.len() as u64;
1212            } else {
1213                for row_values in &effective_rows {
1214                    if row_values.len() != query.columns.len() {
1215                        return Err(RedDBError::Query(format!(
1216                            "INSERT column count ({}) does not match value count ({})",
1217                            query.columns.len(),
1218                            row_values.len()
1219                        )));
1220                    }
1221
1222                    match query.entity_type {
1223                        InsertEntityType::Row => {
1224                            if query.returning.is_some() {
1225                                return Err(RedDBError::Query(
1226                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1227                                    .to_string(),
1228                            ));
1229                            }
1230                            let (fields, mut metadata) =
1231                                split_insert_metadata(self, &query.columns, row_values)?;
1232                            merge_with_clauses(
1233                                &mut metadata,
1234                                query.ttl_ms,
1235                                query.expires_at_ms,
1236                                &query.with_metadata,
1237                            );
1238                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1239                        }
1240                        InsertEntityType::Node | InsertEntityType::Edge => {
1241                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1242                        }
1243                        InsertEntityType::Vector => {
1244                            let (vector_values, mut metadata) =
1245                                split_insert_metadata(self, &query.columns, row_values)?;
1246                            merge_with_clauses(
1247                                &mut metadata,
1248                                query.ttl_ms,
1249                                query.expires_at_ms,
1250                                &query.with_metadata,
1251                            );
1252                            let (columns, values) = pairwise_columns_values(&vector_values);
1253                            let dense = find_column_value_vec_f32_any(
1254                                &columns,
1255                                &values,
1256                                &["dense", "embedding"],
1257                            )?;
1258                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1259                            let content =
1260                                find_column_value_opt_string(&columns, &values, "content");
1261                            if query.returning.is_some() {
1262                                returning_field_snaps.push(vector_values.clone());
1263                            }
1264                            let input = CreateVectorInput {
1265                                collection: query.table.clone(),
1266                                dense,
1267                                content,
1268                                metadata,
1269                                link_row: None,
1270                                link_node: None,
1271                            };
1272                            entity_outputs.push(self.create_vector(input)?);
1273                        }
1274                        InsertEntityType::Document => {
1275                            let (document_values, mut metadata) =
1276                                split_insert_metadata(self, &query.columns, row_values)?;
1277                            merge_with_clauses(
1278                                &mut metadata,
1279                                query.ttl_ms,
1280                                query.expires_at_ms,
1281                                &query.with_metadata,
1282                            );
1283                            let (columns, values) = pairwise_columns_values(&document_values);
1284                            let body_str = find_column_value_string(&columns, &values, "body")?;
1285                            let body: crate::json::Value = crate::json::from_str(&body_str)
1286                                .map_err(|e| {
1287                                    RedDBError::Query(format!("invalid JSON body: {e}"))
1288                                })?;
1289                            let input = CreateDocumentInput {
1290                                collection: query.table.clone(),
1291                                body,
1292                                metadata,
1293                                node_links: Vec::new(),
1294                                vector_links: Vec::new(),
1295                            };
1296                            let output = self.create_document(input)?;
1297                            if query.returning.is_some() {
1298                                let fields = output
1299                                    .entity
1300                                    .as_ref()
1301                                    .map(entity_row_fields_snapshot)
1302                                    .filter(|fields| !fields.is_empty())
1303                                    .unwrap_or(document_values);
1304                                returning_field_snaps.push(fields);
1305                            }
1306                            entity_outputs.push(output);
1307                        }
1308                        InsertEntityType::Kv => {
1309                            let (kv_values, mut metadata) =
1310                                split_insert_metadata(self, &query.columns, row_values)?;
1311                            merge_with_clauses(
1312                                &mut metadata,
1313                                query.ttl_ms,
1314                                query.expires_at_ms,
1315                                &query.with_metadata,
1316                            );
1317                            let (columns, values) = pairwise_columns_values(&kv_values);
1318                            let key = find_column_value_string(&columns, &values, "key")?;
1319                            let value = find_column_value(&columns, &values, "value")?;
1320                            if query.returning.is_some() {
1321                                returning_field_snaps.push(kv_values.clone());
1322                            }
1323                            let input = CreateKvInput {
1324                                collection: query.table.clone(),
1325                                key,
1326                                value,
1327                                metadata,
1328                            };
1329                            entity_outputs.push(self.create_kv(input)?);
1330                        }
1331                    }
1332
1333                    inserted_count += 1;
1334                }
1335            }
1336
1337            if let Some(items) = query.returning.as_ref() {
1338                if !entity_outputs.is_empty() {
1339                    returning_result = Some(build_returning_result(
1340                        items,
1341                        &returning_field_snaps,
1342                        Some(&entity_outputs),
1343                    ));
1344                }
1345            }
1346        }
1347
1348        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1349        if let Some(ref embed_config) = query.auto_embed {
1350            let store = self.inner.db.store();
1351            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1352            let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1353            // Local provider runs in-process — no API key path applies.
1354            // The pre-flight above already required `MODEL '<name>'`
1355            // for the local case, so the unwrap_or default below only
1356            // ever fires for OpenAI-compatible providers.
1357            let api_key = if is_local_provider {
1358                String::new()
1359            } else {
1360                crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1361            };
1362            let model = embed_config.model.clone().unwrap_or_else(|| {
1363                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1364                    .ok()
1365                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1366            });
1367
1368            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1369            let manager = store
1370                .get_collection(&query.table)
1371                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1372            let entities = manager.query_all(|_| true);
1373            let recent: Vec<_> = entities
1374                .into_iter()
1375                .rev()
1376                .take(effective_rows.len())
1377                .collect();
1378
1379            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1380            let entity_combos: Vec<(usize, String)> = recent
1381                .iter()
1382                .enumerate()
1383                .filter_map(|(i, entity)| {
1384                    if let EntityData::Row(ref row) = entity.data {
1385                        if let Some(ref named) = row.named {
1386                            let texts: Vec<String> = embed_config
1387                                .fields
1388                                .iter()
1389                                .filter_map(|field| match named.get(field) {
1390                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1391                                    _ => None,
1392                                })
1393                                .collect();
1394                            if !texts.is_empty() {
1395                                return Some((i, texts.join(" ")));
1396                            }
1397                        }
1398                    }
1399                    None
1400                })
1401                .collect();
1402
1403            if !entity_combos.is_empty() {
1404                // Batch phase: single provider round-trip for all rows.
1405                let batch_texts: Vec<String> =
1406                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1407
1408                // Issue #682 — when the provider is `local`, bypass
1409                // AiBatchClient (which is HTTP-only) and dispatch
1410                // directly through the in-process local embedding
1411                // backend. All texts go in one call, mirroring the
1412                // single-round-trip shape of the remote path. The
1413                // local backend does not perform intra-batch dedup —
1414                // each input position gets its own row in the output
1415                // — which keeps the per-row "create_vector" loop
1416                // below correct without additional fan-out logic.
1417                let embeddings = if is_local_provider {
1418                    let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1419                        &self.inner.db,
1420                        &model,
1421                        batch_texts,
1422                    )?;
1423                    response.embeddings
1424                } else {
1425                    let batch_client =
1426                        crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1427
1428                    match tokio::runtime::Handle::try_current() {
1429                        Ok(handle) => tokio::task::block_in_place(|| {
1430                            handle.block_on(batch_client.embed_batch(
1431                                &provider,
1432                                &model,
1433                                &api_key,
1434                                batch_texts,
1435                            ))
1436                        }),
1437                        Err(_) => {
1438                            return Err(RedDBError::Query(
1439                                "AUTO EMBED requires a Tokio runtime context".to_string(),
1440                            ));
1441                        }
1442                    }
1443                    .map_err(|e| RedDBError::Query(e.to_string()))?
1444                };
1445
1446                // Distribute phase: persist one vector per non-empty embedding.
1447                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1448                    if dense.is_empty() {
1449                        continue;
1450                    }
1451                    self.create_vector(CreateVectorInput {
1452                        collection: query.table.clone(),
1453                        dense,
1454                        content: Some(combined.clone()),
1455                        metadata: Vec::new(),
1456                        link_row: None,
1457                        link_node: None,
1458                    })?;
1459                }
1460            }
1461        }
1462
1463        if inserted_count > 0 {
1464            self.note_table_write(&query.table);
1465        }
1466
1467        let mut result = RuntimeQueryResult::dml_result(
1468            raw_query.to_string(),
1469            inserted_count,
1470            "insert",
1471            "runtime-dml",
1472        );
1473        if let Some(returning) = returning_result {
1474            result.result = returning;
1475        }
1476        Ok(result)
1477    }
1478
1479    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1480        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1481            return Ok(());
1482        };
1483        if !auth_store.iam_authorization_enabled() {
1484            return Ok(());
1485        }
1486        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1487            return Ok(());
1488        };
1489
1490        let tenant = crate::runtime::impl_core::current_tenant();
1491        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1492        let request = crate::auth::ColumnAccessRequest {
1493            action: "insert".to_string(),
1494            schema: None,
1495            table: query.table.clone(),
1496            columns: query.columns.clone(),
1497        };
1498        let ctx = crate::auth::policies::EvalContext {
1499            principal_tenant: tenant.clone(),
1500            current_tenant: tenant,
1501            peer_ip: None,
1502            mfa_present: false,
1503            now_ms: crate::auth::now_ms(),
1504            principal_is_admin_role: role == crate::auth::Role::Admin,
1505            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1506            principal_is_platform_scoped: principal.tenant.is_none(),
1507        };
1508
1509        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1510        let table_allowed = matches!(
1511            outcome.table_decision,
1512            crate::auth::policies::Decision::Allow { .. }
1513                | crate::auth::policies::Decision::AdminBypass
1514        );
1515        if !table_allowed {
1516            return Err(RedDBError::Query(format!(
1517                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1518                outcome.table_resource.kind, outcome.table_resource.name
1519            )));
1520        }
1521        if let Some(denied) = outcome.first_denied_column() {
1522            return Err(RedDBError::Query(format!(
1523                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1524                denied.resource.kind, denied.resource.name
1525            )));
1526        }
1527
1528        Ok(())
1529    }
1530
1531    pub(crate) fn insert_timeseries_point(
1532        &self,
1533        collection: &str,
1534        fields: Vec<(String, Value)>,
1535        mut metadata: Vec<(String, MetadataValue)>,
1536    ) -> RedDBResult<EntityId> {
1537        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1538
1539        let (columns, values) = pairwise_columns_values(&fields);
1540        validate_timeseries_insert_columns(&columns)?;
1541
1542        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1543        // an `event_name` whose schema is registered, validate the
1544        // `payload` JSON against it BEFORE any write side-effect. On
1545        // failure we return a typed error and the row is not
1546        // persisted. When no schema is registered for the event name
1547        // (or no `event_name` column is supplied at all) we fall
1548        // through to the normal write path for back-compat with
1549        // existing timeseries rows.
1550        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1551        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1552        if let Some(event_name) = event_name_opt.as_deref() {
1553            let store_for_schema = self.inner.db.store();
1554            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1555                .is_some()
1556            {
1557                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1558                super::analytics_schema_registry::validate(
1559                    store_for_schema.as_ref(),
1560                    event_name,
1561                    payload_json,
1562                )
1563                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1564            }
1565        }
1566
1567        // `metric` is required by the existing timeseries write path;
1568        // when an analytics-style row supplies `event_name` but not
1569        // `metric`, fall back to the event name so the storage path
1570        // still has a non-empty metric tag.
1571        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1572            Some(m) => m,
1573            None => event_name_opt.clone().ok_or_else(|| {
1574                RedDBError::Query(
1575                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1576                )
1577            })?,
1578        };
1579        // `value` is optional for analytics-event rows (which are
1580        // semantically counts of 1); default to 1.0 when missing so
1581        // analytics inserts don't have to fabricate a metric value.
1582        let value = match find_column_value_opt_string(&columns, &values, "value") {
1583            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1584            None => columns
1585                .iter()
1586                .position(|c| c.eq_ignore_ascii_case("value"))
1587                .and_then(|i| match &values[i] {
1588                    Value::Float(f) => Some(*f),
1589                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1590                    Value::UnsignedInteger(n) => Some(*n as f64),
1591                    _ => None,
1592                })
1593                .unwrap_or(1.0),
1594        };
1595        let timestamp_ns =
1596            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1597        let mut tags = find_timeseries_tags(&columns, &values)?;
1598        if let Some(ref name) = event_name_opt {
1599            tags.entry("event_name".to_string())
1600                .or_insert_with(|| name.clone());
1601        }
1602        if let Some(ref payload) = payload_opt {
1603            tags.entry("payload".to_string())
1604                .or_insert_with(|| payload.clone());
1605        }
1606
1607        let mut entity = UnifiedEntity::new(
1608            EntityId::new(0),
1609            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1610                series: collection.to_string(),
1611                metric: metric.clone(),
1612            })),
1613            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1614                metric,
1615                timestamp_ns,
1616                value,
1617                tags,
1618            }),
1619        );
1620        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1621        // or an autocommit xid (allocated and committed up-front so
1622        // future snapshots see the row as soon as it lands).
1623        let writer_xid = match self.current_xid() {
1624            Some(xid) => xid,
1625            None => {
1626                let mgr = self.snapshot_manager();
1627                let xid = mgr.begin();
1628                mgr.commit(xid);
1629                xid
1630            }
1631        };
1632        entity.set_xmin(writer_xid);
1633
1634        let store = self.inner.db.store();
1635        let id = store
1636            .insert_auto(collection, entity)
1637            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1638
1639        if !metadata.is_empty() {
1640            let _ = store.set_metadata(
1641                collection,
1642                id,
1643                Metadata::with_fields(metadata.into_iter().collect()),
1644            );
1645        }
1646
1647        self.cdc_emit(
1648            crate::replication::cdc::ChangeOperation::Insert,
1649            collection,
1650            id.raw(),
1651            "timeseries",
1652        );
1653
1654        Ok(id)
1655    }
1656
1657    /// Execute UPDATE table SET col=val, ... WHERE filter
1658    ///
1659    /// Scans the target collection, evaluates the WHERE filter against each
1660    /// record, and patches every matching entity.
1661    pub fn execute_update(
1662        &self,
1663        raw_query: &str,
1664        query: &UpdateQuery,
1665    ) -> RedDBResult<RuntimeQueryResult> {
1666        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1667        // Issue #523 — blockchain collections are immutable. Reject before
1668        // RLS / RETURNING work so the operator sees a clean 409-mapped
1669        // error instead of a partially-applied mutation surface.
1670        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1671            return Err(RedDBError::InvalidOperation(format!(
1672                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1673                query.table
1674            )));
1675        }
1676        // CollectionContract gate (#50): runs the APPEND ONLY guard
1677        // (and any future contract bits) before RLS / RETURNING work
1678        // so the operator's immutability declaration is honoured
1679        // uniformly and the error message points at the DDL rather
1680        // than at a downstream symptom.
1681        crate::runtime::collection_contract::CollectionContractGate::check(
1682            self,
1683            &query.table,
1684            crate::runtime::collection_contract::MutationKind::Update,
1685        )?;
1686        ensure_update_target_contract(self, &query.table, query.target)?;
1687        ensure_graph_identity_update_target_allowed(query)?;
1688
1689        // Apply RLS augmentation first so every downstream path — plain
1690        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1691        // same policy-filtered target set. This prevents RETURNING
1692        // from ever exposing rows the UPDATE policy would have
1693        // denied.
1694        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1695        let augmented_query: UpdateQuery;
1696        let effective_query: &UpdateQuery = if rls_gated {
1697            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1698                self,
1699                &query.table,
1700                crate::storage::query::ast::PolicyAction::Update,
1701            );
1702            let Some(policy) = rls_filter else {
1703                // No admitting policy: zero rows affected, empty
1704                // RETURNING (never leak rows the caller can't touch).
1705                let mut response = RuntimeQueryResult::dml_result(
1706                    raw_query.to_string(),
1707                    0,
1708                    "update",
1709                    "runtime-dml-rls",
1710                );
1711                if let Some(items) = query.returning.clone() {
1712                    response.result = build_returning_result(&items, &[], None);
1713                }
1714                return Ok(response);
1715            };
1716            let mut augmented = query.clone();
1717            augmented.filter = Some(match augmented.filter.take() {
1718                Some(existing) => {
1719                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1720                }
1721                None => policy,
1722            });
1723            augmented_query = augmented;
1724            &augmented_query
1725        } else {
1726            query
1727        };
1728
1729        // RETURNING wraps the inner executor and uses the touched-id
1730        // list the inner reports so the post-image reflects exactly
1731        // the rows the UPDATE actually mutated (not whatever a
1732        // separate SELECT might have observed).
1733        if let Some(items) = effective_query.returning.clone() {
1734            let mut inner_query = effective_query.clone();
1735            inner_query.returning = None;
1736            let (mut response, touched_ids) =
1737                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1738
1739            let snapshots = if matches!(
1740                effective_query.target,
1741                UpdateTarget::Nodes | UpdateTarget::Edges
1742            ) {
1743                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1744            } else {
1745                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1746                    .row_snapshots(&touched_ids)
1747            };
1748
1749            response.result = build_returning_result(&items, &snapshots, None);
1750            response.engine = "runtime-dml-returning";
1751            return Ok(response);
1752        }
1753
1754        self.execute_update_inner(raw_query, effective_query)
1755    }
1756
1757    /// Back-compat shim: the older entry point ignored touched ids.
1758    fn execute_update_inner(
1759        &self,
1760        raw_query: &str,
1761        query: &UpdateQuery,
1762    ) -> RedDBResult<RuntimeQueryResult> {
1763        self.execute_update_inner_tracked(raw_query, query)
1764            .map(|(res, _)| res)
1765    }
1766
1767    fn execute_update_inner_tracked(
1768        &self,
1769        raw_query: &str,
1770        query: &UpdateQuery,
1771    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1772        let store = self.inner.db.store();
1773        let effective_filter = effective_update_filter(query);
1774        let compiled_plan = self.compile_update_plan(query)?;
1775        let needs_rmw_lock = update_needs_rmw_lock(query);
1776        let table_rmw_lock = if needs_rmw_lock {
1777            Some(
1778                self.inner
1779                    .rmw_locks
1780                    .lock_for(&query.table, "__table_rmw_update__"),
1781            )
1782        } else {
1783            None
1784        };
1785        let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1786        let mut touched_ids: Vec<EntityId> = Vec::new();
1787        let limit_cap = query.limit.map(|l| l as usize);
1788        let manager = store
1789            .get_collection(&query.table)
1790            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1791        let scan_limit = if query.order_by.is_empty() {
1792            limit_cap
1793        } else {
1794            None
1795        };
1796        let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1797            self,
1798            &query.table,
1799            effective_filter.as_ref(),
1800            scan_limit,
1801            query.target,
1802        );
1803        if needs_rmw_lock {
1804            target_scan = target_scan.with_live_table_rows();
1805        }
1806        let ids_to_update = target_scan.find_target_ids()?;
1807        let ids_to_update = if query.order_by.is_empty() {
1808            ids_to_update
1809        } else {
1810            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1811        };
1812
1813        if needs_rmw_lock {
1814            return self.execute_update_inner_tracked_locked(
1815                raw_query,
1816                query,
1817                &compiled_plan,
1818                &ids_to_update,
1819                effective_filter.as_ref(),
1820            );
1821        }
1822
1823        let mut affected: u64 = 0;
1824        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1825            let mut applied_chunk = Vec::with_capacity(chunk.len());
1826            for entity in manager.get_many(chunk).into_iter().flatten() {
1827                let assignments =
1828                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1829                let applied = self.apply_materialized_update_for_entity(
1830                    query.table.clone(),
1831                    entity,
1832                    &compiled_plan,
1833                    assignments,
1834                )?;
1835                touched_ids.push(applied.id);
1836                applied_chunk.push(applied);
1837            }
1838            self.persist_update_chunk(&applied_chunk)?;
1839            affected += applied_chunk.len() as u64;
1840            let lsns = self.flush_update_chunk(&applied_chunk)?;
1841            if !query.suppress_events {
1842                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1843            }
1844        }
1845
1846        if affected > 0 {
1847            self.note_table_write(&query.table);
1848        }
1849
1850        Ok((
1851            RuntimeQueryResult::dml_result(
1852                raw_query.to_string(),
1853                affected,
1854                "update",
1855                "runtime-dml",
1856            ),
1857            touched_ids,
1858        ))
1859    }
1860
1861    fn execute_update_inner_tracked_locked(
1862        &self,
1863        raw_query: &str,
1864        query: &UpdateQuery,
1865        compiled_plan: &CompiledUpdatePlan,
1866        ids_to_update: &[EntityId],
1867        effective_filter: Option<&Filter>,
1868    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1869        let store = self.inner.db.store();
1870        let mut touched_ids = Vec::new();
1871        let mut lock_entries = Vec::new();
1872
1873        for id in ids_to_update {
1874            let Some(candidate) = store.get(&query.table, *id) else {
1875                continue;
1876            };
1877            let logical_id = candidate.logical_id();
1878            let lock_key = format!("row:{}", logical_id.raw());
1879            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1880            lock_entries.push((lock_key, logical_id, rmw_lock));
1881        }
1882
1883        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1884        lock_entries.dedup_by(|left, right| left.0 == right.0);
1885        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1886
1887        let mut applied_chunk = Vec::new();
1888        for (_, logical_id, _) in &lock_entries {
1889            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1890            else {
1891                continue;
1892            };
1893            if let Some(filter) = effective_filter {
1894                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1895                    Some(self.inner.db.as_ref()),
1896                    &entity,
1897                    filter,
1898                    &query.table,
1899                    &query.table,
1900                ) {
1901                    continue;
1902                }
1903            }
1904
1905            let assignments =
1906                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1907            let applied = self.apply_materialized_update_for_entity(
1908                query.table.clone(),
1909                entity,
1910                compiled_plan,
1911                assignments,
1912            )?;
1913            touched_ids.push(applied.id);
1914            applied_chunk.push(applied);
1915        }
1916
1917        let affected = applied_chunk.len() as u64;
1918        if !applied_chunk.is_empty() {
1919            self.persist_update_chunk(&applied_chunk)?;
1920            let lsns = self.flush_update_chunk(&applied_chunk)?;
1921            if !query.suppress_events {
1922                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1923            }
1924        }
1925
1926        if affected > 0 {
1927            self.note_table_write(&query.table);
1928        }
1929
1930        Ok((
1931            RuntimeQueryResult::dml_result(
1932                raw_query.to_string(),
1933                affected,
1934                "update",
1935                "runtime-dml",
1936            ),
1937            touched_ids,
1938        ))
1939    }
1940
1941    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1942        let mut static_field_assignments = Vec::new();
1943        let mut static_metadata_assignments = Vec::new();
1944        let mut dynamic_assignments = Vec::new();
1945        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1946        let mut row_modified_columns = Vec::new();
1947
1948        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1949            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1950            let metadata_key = resolve_sql_ttl_metadata_key(column);
1951            if compound_op.is_some() && metadata_key.is_some() {
1952                return Err(RedDBError::Query(format!(
1953                    "compound assignment is only supported for row fields: {column}"
1954                )));
1955            }
1956            if compound_op.is_none() {
1957                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1958                    if let Some(metadata_key) = metadata_key {
1959                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1960                        let (canonical_key, canonical_value) =
1961                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1962                        static_metadata_assignments
1963                            .push((canonical_key.to_string(), canonical_value));
1964                    } else {
1965                        let value = self.resolve_crypto_sentinel(value)?;
1966                        static_field_assignments.push((
1967                            column.clone(),
1968                            normalize_row_update_assignment_with_plan(
1969                                &query.table,
1970                                column,
1971                                value,
1972                                row_contract_plan.as_ref(),
1973                            )?,
1974                        ));
1975                        row_modified_columns.push(column.clone());
1976                    }
1977                    continue;
1978                }
1979            }
1980
1981            dynamic_assignments.push(CompiledUpdateAssignment {
1982                column: column.clone(),
1983                expr: expr.clone(),
1984                compound_op,
1985                metadata_key,
1986                row_rule: if metadata_key.is_none() {
1987                    if let Some(plan) = row_contract_plan.as_ref() {
1988                        if plan.timestamps_enabled
1989                            && (column == "created_at" || column == "updated_at")
1990                        {
1991                            return Err(RedDBError::Query(format!(
1992                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1993                                query.table, column
1994                            )));
1995                        }
1996                        if let Some(rule) = plan.declared_rules.get(column) {
1997                            Some(rule.clone())
1998                        } else if plan.strict_schema {
1999                            return Err(RedDBError::Query(format!(
2000                                "collection '{}' is strict and does not allow undeclared fields: {}",
2001                                query.table, column
2002                            )));
2003                        } else {
2004                            None
2005                        }
2006                    } else {
2007                        None
2008                    }
2009                } else {
2010                    None
2011                },
2012            });
2013            if metadata_key.is_none() {
2014                row_modified_columns.push(column.clone());
2015            }
2016        }
2017
2018        let row_modified_columns = dedupe_update_columns(row_modified_columns);
2019        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2020            row_modified_columns.iter().any(|column| {
2021                plan.unique_columns
2022                    .keys()
2023                    .any(|unique| unique.eq_ignore_ascii_case(column))
2024            })
2025        });
2026
2027        if let Some(ttl_ms) = query.ttl_ms {
2028            static_metadata_assignments
2029                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2030        }
2031        if let Some(expires_at_ms) = query.expires_at_ms {
2032            static_metadata_assignments.push((
2033                "_expires_at".to_string(),
2034                metadata_u64_to_value(expires_at_ms),
2035            ));
2036        }
2037        for (key, val) in &query.with_metadata {
2038            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2039        }
2040
2041        Ok(CompiledUpdatePlan {
2042            static_field_assignments,
2043            static_metadata_assignments,
2044            dynamic_assignments,
2045            row_contract_plan,
2046            row_modified_columns,
2047            row_touches_unique_columns,
2048        })
2049    }
2050
2051    fn materialize_update_assignments_for_entity(
2052        &self,
2053        query: &UpdateQuery,
2054        entity: &UnifiedEntity,
2055        compiled_plan: &CompiledUpdatePlan,
2056    ) -> RedDBResult<MaterializedUpdateAssignments> {
2057        let mut assignments = MaterializedUpdateAssignments::default();
2058        let mut record: Option<UnifiedRecord> = None;
2059
2060        for assignment in &compiled_plan.dynamic_assignments {
2061            if assignment.compound_op.is_some()
2062                && !matches!(
2063                    entity.data,
2064                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2065                )
2066            {
2067                return Err(RedDBError::Query(format!(
2068                    "compound assignment is only supported for row or graph UPDATE column '{}'",
2069                    assignment.column
2070                )));
2071            }
2072            if record.is_none() {
2073                record = runtime_any_record_from_entity_ref(entity);
2074            }
2075            let Some(record) = record.as_ref() else {
2076                return Err(RedDBError::Query(format!(
2077                    "UPDATE could not materialize runtime record for entity {} in '{}'",
2078                    entity.id.raw(),
2079                    query.table
2080                )));
2081            };
2082            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2083                Some(self.inner.db.as_ref()),
2084                &assignment.expr,
2085                record,
2086                Some(query.table.as_str()),
2087                Some(query.table.as_str()),
2088            )
2089            .ok_or_else(|| {
2090                RedDBError::Query(format!(
2091                    "failed to evaluate UPDATE expression for column '{}'",
2092                    assignment.column
2093                ))
2094            })?;
2095            let value = if let Some(op) = assignment.compound_op {
2096                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2097            } else {
2098                rhs
2099            };
2100
2101            if let Some(metadata_key) = assignment.metadata_key {
2102                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2103                let (canonical_key, canonical_value) =
2104                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2105                assignments
2106                    .dynamic_metadata_assignments
2107                    .push((canonical_key.to_string(), canonical_value));
2108            } else {
2109                assignments.dynamic_field_assignments.push((
2110                    assignment.column.clone(),
2111                    normalize_row_update_value_for_rule(
2112                        &query.table,
2113                        self.resolve_crypto_sentinel(value)?,
2114                        assignment.row_rule.as_ref(),
2115                    )?,
2116                ));
2117            }
2118        }
2119
2120        Ok(assignments)
2121    }
2122
2123    fn apply_materialized_update_for_entity(
2124        &self,
2125        collection: String,
2126        entity: UnifiedEntity,
2127        compiled_plan: &CompiledUpdatePlan,
2128        assignments: MaterializedUpdateAssignments,
2129    ) -> RedDBResult<AppliedEntityMutation> {
2130        if matches!(entity.data, EntityData::Row(_)) {
2131            return self.apply_loaded_sql_update_row_core(
2132                collection,
2133                entity,
2134                &compiled_plan.static_field_assignments,
2135                assignments.dynamic_field_assignments,
2136                &compiled_plan.static_metadata_assignments,
2137                assignments.dynamic_metadata_assignments,
2138                compiled_plan.row_contract_plan.as_ref(),
2139                &compiled_plan.row_modified_columns,
2140                compiled_plan.row_touches_unique_columns,
2141            );
2142        }
2143
2144        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2145
2146        let operations = build_patch_operations_from_materialized_assignments(
2147            &entity,
2148            compiled_plan,
2149            assignments,
2150        );
2151        self.apply_loaded_patch_entity_core(
2152            collection,
2153            entity,
2154            crate::json::Value::Null,
2155            operations,
2156        )
2157    }
2158
2159    /// Execute DELETE FROM table WHERE filter
2160    pub fn execute_delete(
2161        &self,
2162        raw_query: &str,
2163        query: &DeleteQuery,
2164    ) -> RedDBResult<RuntimeQueryResult> {
2165        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2166        // Issue #523 — blockchain collections are immutable; see
2167        // execute_update for the same gate.
2168        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2169            return Err(RedDBError::InvalidOperation(format!(
2170                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2171                query.table
2172            )));
2173        }
2174        // CollectionContract gate (#50) — see execute_update for
2175        // rationale. The gate handles APPEND ONLY rejection and is
2176        // the single point where future contract bits land.
2177        crate::runtime::collection_contract::CollectionContractGate::check(
2178            self,
2179            &query.table,
2180            crate::runtime::collection_contract::MutationKind::Delete,
2181        )?;
2182
2183        // RETURNING on DELETE: capture the pre-image via an internal
2184        // SELECT that reuses the same WHERE, then run the delete with
2185        // the RETURNING clause stripped, then project the captured
2186        // rows through the requested items. The extra SELECT is a
2187        // pragmatic MVP — a future pass can fuse the scan with the
2188        // delete to avoid the second pass over the heap.
2189        if let Some(items) = query.returning.clone() {
2190            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2191                RedDBError::Query(
2192                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2193                )
2194            })?;
2195            let captured = self.execute_query(&select_sql)?;
2196
2197            let mut inner_query = query.clone();
2198            inner_query.returning = None;
2199            let _ = self.execute_delete(raw_query, &inner_query)?;
2200
2201            let snapshots: Vec<Vec<(String, Value)>> = captured
2202                .result
2203                .records
2204                .iter()
2205                .map(|rec| {
2206                    rec.iter_fields()
2207                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2208                        .collect()
2209                })
2210                .collect();
2211            let affected = snapshots.len() as u64;
2212            let result = build_returning_result(&items, &snapshots, None);
2213
2214            let mut response = RuntimeQueryResult::dml_result(
2215                raw_query.to_string(),
2216                affected,
2217                "delete",
2218                "runtime-dml-returning",
2219            );
2220            response.result = result;
2221            return Ok(response);
2222        }
2223        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2224        //
2225        // When the table has RLS enabled, gate the DELETE by the
2226        // per-role policy set: mutations only touch rows that *every*
2227        // matching `FOR DELETE` policy would accept. No policies =>
2228        // zero rows affected (PG restrictive-default).
2229        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2230            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2231                self,
2232                &query.table,
2233                crate::storage::query::ast::PolicyAction::Delete,
2234            );
2235            let Some(policy) = rls_filter else {
2236                return Ok(RuntimeQueryResult::dml_result(
2237                    raw_query.to_string(),
2238                    0,
2239                    "delete",
2240                    "runtime-dml-rls",
2241                ));
2242            };
2243            // Fold the policy predicate into the user's WHERE before
2244            // dispatching — the remainder of this function reads the
2245            // filter from `query` via `effective_delete_filter`, which
2246            // respects the updated value.
2247            let mut augmented = query.clone();
2248            augmented.filter = Some(match augmented.filter.take() {
2249                Some(existing) => {
2250                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2251                }
2252                None => policy,
2253            });
2254            return self.execute_delete_inner(raw_query, &augmented);
2255        }
2256        self.execute_delete_inner(raw_query, query)
2257    }
2258
2259    fn execute_delete_inner(
2260        &self,
2261        raw_query: &str,
2262        query: &DeleteQuery,
2263    ) -> RedDBResult<RuntimeQueryResult> {
2264        let effective_filter = effective_delete_filter(query);
2265
2266        // Find the rows that match the WHERE clause. The "find target
2267        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2268        // the same scan strategy.
2269        let scan = super::dml_target_scan::DmlTargetScan::new(
2270            self,
2271            &query.table,
2272            effective_filter.as_ref(),
2273            None,
2274        );
2275        let ids_to_delete = scan.find_target_ids()?;
2276
2277        // For event-enabled collections, snapshot the pre-delete state
2278        // before rows are physically removed.
2279        let needs_delete_events =
2280            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2281        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2282            scan.row_json_pre_images(&ids_to_delete)
2283        } else {
2284            HashMap::new()
2285        };
2286
2287        let mut affected: u64 = 0;
2288        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2289            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2290            affected += count;
2291            if needs_delete_events && !lsns.is_empty() {
2292                // lsns.len() == actually-deleted entities; align with chunk ids.
2293                // `delete_batch` may skip missing entities, so we correlate by
2294                // the number returned (they're emitted in chunk order).
2295                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2296                self.emit_delete_events_for_collection(
2297                    &query.table,
2298                    deleted_chunk,
2299                    &lsns,
2300                    &pre_images,
2301                )?;
2302            }
2303        }
2304        pre_images.clear();
2305
2306        if affected > 0 {
2307            self.note_table_write(&query.table);
2308        }
2309
2310        Ok(RuntimeQueryResult::dml_result(
2311            raw_query.to_string(),
2312            affected,
2313            "delete",
2314            "runtime-dml",
2315        ))
2316    }
2317}
2318
2319/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2320/// columns regardless of whether any row matches the WHERE clause. The
2321/// per-entity guard below covers only the matched-rows case, but ADR 0019
2322/// declares these columns immutable on the surface itself, so a zero-row
2323/// UPDATE should still surface the same error to operators and SDKs.
2324fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2325    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2326        return Ok(());
2327    }
2328    for (column, _) in &query.assignment_exprs {
2329        if is_immutable_graph_identity_field(column) {
2330            return Err(RedDBError::Query(format!(
2331                "immutable graph field '{column}' cannot be updated"
2332            )));
2333        }
2334    }
2335    Ok(())
2336}
2337
2338fn ensure_graph_identity_update_allowed(
2339    entity: &UnifiedEntity,
2340    compiled_plan: &CompiledUpdatePlan,
2341    assignments: &MaterializedUpdateAssignments,
2342) -> RedDBResult<()> {
2343    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2344        return Ok(());
2345    }
2346
2347    for (column, _) in compiled_plan
2348        .static_field_assignments
2349        .iter()
2350        .chain(assignments.dynamic_field_assignments.iter())
2351    {
2352        if is_immutable_graph_identity_field(column) {
2353            return Err(RedDBError::Query(format!(
2354                "immutable graph field '{column}' cannot be updated"
2355            )));
2356        }
2357    }
2358
2359    Ok(())
2360}
2361
2362fn is_immutable_graph_identity_field(column: &str) -> bool {
2363    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2364        .iter()
2365        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2366}
2367
2368fn build_patch_operations_from_materialized_assignments(
2369    entity: &UnifiedEntity,
2370    compiled_plan: &CompiledUpdatePlan,
2371    assignments: MaterializedUpdateAssignments,
2372) -> Vec<PatchEntityOperation> {
2373    let mut operations = Vec::with_capacity(
2374        compiled_plan.static_field_assignments.len()
2375            + compiled_plan.static_metadata_assignments.len()
2376            + assignments.dynamic_field_assignments.len()
2377            + assignments.dynamic_metadata_assignments.len(),
2378    );
2379
2380    for (column, value) in &compiled_plan.static_field_assignments {
2381        operations.push(PatchEntityOperation {
2382            op: PatchEntityOperationType::Set,
2383            path: update_patch_path_for_entity(entity, column),
2384            value: Some(storage_value_to_json(value)),
2385        });
2386    }
2387
2388    for (column, value) in assignments.dynamic_field_assignments {
2389        operations.push(PatchEntityOperation {
2390            op: PatchEntityOperationType::Set,
2391            path: update_patch_path_for_entity(entity, &column),
2392            value: Some(storage_value_to_json(&value)),
2393        });
2394    }
2395
2396    for (key, value) in &compiled_plan.static_metadata_assignments {
2397        operations.push(PatchEntityOperation {
2398            op: PatchEntityOperationType::Set,
2399            path: vec!["metadata".to_string(), key.clone()],
2400            value: Some(metadata_value_to_json(value)),
2401        });
2402    }
2403
2404    for (key, value) in assignments.dynamic_metadata_assignments {
2405        operations.push(PatchEntityOperation {
2406            op: PatchEntityOperationType::Set,
2407            path: vec!["metadata".to_string(), key],
2408            value: Some(metadata_value_to_json(&value)),
2409        });
2410    }
2411
2412    operations
2413}
2414
2415fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2416    if matches!(
2417        (&entity.kind, &entity.data),
2418        (
2419            crate::storage::EntityKind::GraphNode(_),
2420            EntityData::Node(_)
2421        )
2422    ) && column.eq_ignore_ascii_case("node_type")
2423    {
2424        return vec!["node_type".to_string()];
2425    }
2426    if matches!(
2427        (&entity.kind, &entity.data),
2428        (
2429            crate::storage::EntityKind::GraphEdge(_),
2430            EntityData::Edge(_)
2431        )
2432    ) && column.eq_ignore_ascii_case("weight")
2433    {
2434        return vec!["weight".to_string()];
2435    }
2436    vec!["fields".to_string(), column.to_string()]
2437}
2438
2439/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2440/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2441/// capture the pre-image before actually removing the rows. Returns
2442/// `None` when the input does not start with `DELETE`.
2443///
2444/// Case-insensitive on the keywords. Preserves everything between
2445/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2446/// LIMIT survive untouched. The RETURNING tail — if present — is
2447/// truncated at the first top-level `RETURNING` token.
2448fn delete_to_select_sql(sql: &str) -> Option<String> {
2449    let trimmed = sql.trim_start();
2450    let lowered = trimmed.to_ascii_lowercase();
2451    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2452        return None;
2453    }
2454    // Find `FROM` after DELETE.
2455    let from_idx = lowered.find(" from ")?;
2456    let after_from = &trimmed[from_idx + " from ".len()..];
2457    let after_from_lc = &lowered[from_idx + " from ".len()..];
2458
2459    // Cut off the RETURNING tail (a naive search — the RETURNING
2460    // clause only appears once per statement at top level in our
2461    // grammar). Matches whitespace-bounded tokens to avoid clipping
2462    // `RETURNING` inside a string literal.
2463    let mut body = after_from.to_string();
2464    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2465        body.truncate(pos);
2466    }
2467    Some(format!("SELECT * FROM {}", body.trim_end()))
2468}
2469
2470/// Find the byte offset of a whitespace-bounded keyword in a
2471/// lowercased haystack, skipping matches inside single-quoted
2472/// string literals. Naive — no escape handling — but enough for
2473/// the shapes the DML parser emits.
2474fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2475    let bytes = haystack.as_bytes();
2476    let nlen = needle.len();
2477    let mut i = 0usize;
2478    let mut in_string = false;
2479    while i < bytes.len() {
2480        let c = bytes[i];
2481        if c == b'\'' {
2482            in_string = !in_string;
2483            i += 1;
2484            continue;
2485        }
2486        if !in_string
2487            && i + nlen <= bytes.len()
2488            && &bytes[i..i + nlen] == needle.as_bytes()
2489            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2490            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2491        {
2492            return Some(i);
2493        }
2494        i += 1;
2495    }
2496    None
2497}
2498
2499/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2500/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2501/// for one affected row; `outputs`, when provided, supplies the engine-
2502/// assigned entity id for the same row (INSERT path). Projection honours
2503/// the RETURNING items: `*` expands to every snapshot column plus
2504/// the public row envelope when available.
2505fn build_returning_result(
2506    items: &[ReturningItem],
2507    snapshots: &[Vec<(String, Value)>],
2508    outputs: Option<&[CreateEntityOutput]>,
2509) -> UnifiedResult {
2510    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2511    let public_item_outputs = outputs.is_some_and(|outs| {
2512        outs.first()
2513            .and_then(|out| out.entity.as_ref())
2514            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2515    });
2516
2517    let mut columns: Vec<String> = if project_all {
2518        let mut cols: Vec<String> = Vec::new();
2519        if public_item_outputs {
2520            cols.extend(
2521                [
2522                    "rid",
2523                    "collection",
2524                    "kind",
2525                    "tenant",
2526                    "created_at",
2527                    "updated_at",
2528                ]
2529                .into_iter()
2530                .map(str::to_string),
2531            );
2532        } else if outputs.is_some() {
2533            cols.push("red_entity_id".to_string());
2534        }
2535        if let Some(first) = snapshots.first() {
2536            for (name, _) in first {
2537                cols.push(name.clone());
2538            }
2539        }
2540        cols
2541    } else {
2542        items
2543            .iter()
2544            .filter_map(|it| match it {
2545                ReturningItem::Column(c) => Some(c.clone()),
2546                ReturningItem::All => None,
2547            })
2548            .collect()
2549    };
2550    // Guarantee unique order-preserving column list.
2551    {
2552        let mut seen = std::collections::HashSet::new();
2553        columns.retain(|c| seen.insert(c.clone()));
2554    }
2555
2556    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2557    for (idx, snap) in snapshots.iter().enumerate() {
2558        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2559        if let Some(outs) = outputs {
2560            if let Some(out) = outs.get(idx) {
2561                if let Some(entity) = out.entity.as_ref() {
2562                    if let Some(kind) = public_returning_item_kind(entity) {
2563                        values.insert(
2564                            Arc::clone(&sys_key_rid()),
2565                            Value::UnsignedInteger(out.id.raw()),
2566                        );
2567                        values.insert(
2568                            Arc::clone(&sys_key_collection()),
2569                            Value::text(entity.kind.collection().to_string()),
2570                        );
2571                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2572                        values.insert(
2573                            Arc::clone(&sys_key_created_at()),
2574                            Value::UnsignedInteger(entity.created_at),
2575                        );
2576                        values.insert(
2577                            Arc::clone(&sys_key_updated_at()),
2578                            Value::UnsignedInteger(entity.updated_at),
2579                        );
2580                        // Legacy alias: an explicit `RETURNING red_entity_id`
2581                        // still resolves to the row's rid. Only surfaces when
2582                        // the projected column list names it — `RETURNING *`
2583                        // keeps the envelope clean (rid, not red_entity_id).
2584                        values.insert(
2585                            Arc::clone(&sys_key_red_entity_id()),
2586                            Value::UnsignedInteger(out.id.raw()),
2587                        );
2588                    } else {
2589                        values.insert(
2590                            Arc::clone(&sys_key_red_entity_id()),
2591                            Value::Integer(out.id.raw() as i64),
2592                        );
2593                    }
2594                } else {
2595                    values.insert(
2596                        Arc::clone(&sys_key_red_entity_id()),
2597                        Value::Integer(out.id.raw() as i64),
2598                    );
2599                }
2600            }
2601        }
2602        for (name, val) in snap {
2603            values.insert(Arc::from(name.as_str()), val.clone());
2604        }
2605        if !values.contains_key("tenant") {
2606            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2607            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2608        }
2609        let mut rec = UnifiedRecord::default();
2610        // Only keep projected columns on the record.
2611        for col in &columns {
2612            if let Some(v) = values.get(col.as_str()) {
2613                rec.set_arc(Arc::from(col.as_str()), v.clone());
2614            }
2615        }
2616        records.push(rec);
2617    }
2618
2619    UnifiedResult {
2620        columns,
2621        records,
2622        stats: Default::default(),
2623        pre_serialized_json: None,
2624    }
2625}
2626
2627fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2628    match (&entity.kind, &entity.data) {
2629        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2630            Some("node")
2631        }
2632        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2633            Some("edge")
2634        }
2635        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2636        _ => None,
2637    }
2638}
2639
2640fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2641    let Some(row) = entity.data.as_row() else {
2642        return "row";
2643    };
2644
2645    let is_kv = row.named.as_ref().is_some_and(|named| {
2646        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2647            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2648    });
2649    if is_kv {
2650        return "kv";
2651    }
2652
2653    let is_document = row
2654        .named
2655        .as_ref()
2656        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2657        || row.columns.iter().any(runtime_returning_documentish_value);
2658    if is_document {
2659        "document"
2660    } else {
2661        "row"
2662    }
2663}
2664
2665fn runtime_returning_documentish_value(value: &Value) -> bool {
2666    matches!(value, Value::Json(_) | Value::Blob(_))
2667}
2668
2669fn row_insert_returning_snapshots(
2670    outputs: &[CreateEntityOutput],
2671    fallback: Vec<Vec<(String, Value)>>,
2672) -> Vec<Vec<(String, Value)>> {
2673    outputs
2674        .iter()
2675        .enumerate()
2676        .map(|(idx, out)| {
2677            out.entity
2678                .as_ref()
2679                .map(entity_row_fields_snapshot)
2680                .filter(|snap| !snap.is_empty())
2681                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2682        })
2683        .collect()
2684}
2685
2686fn graph_insert_returning_snapshots(
2687    store: &crate::storage::unified::UnifiedStore,
2688    collection: &str,
2689    ids: &[EntityId],
2690) -> Vec<Vec<(String, Value)>> {
2691    let Some(manager) = store.get_collection(collection) else {
2692        return Vec::new();
2693    };
2694
2695    ids.iter()
2696        .filter_map(|id| manager.get(*id))
2697        .filter_map(|entity| {
2698            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2699            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2700            Some(record)
2701        })
2702        .map(|record| {
2703            record
2704                .iter_fields()
2705                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2706                .collect()
2707        })
2708        .collect()
2709}
2710
2711fn graph_update_returning_snapshots(
2712    runtime: &RedDBRuntime,
2713    collection: &str,
2714    ids: &[EntityId],
2715) -> Vec<Vec<(String, Value)>> {
2716    let store = runtime.db().store();
2717    let Some(manager) = store.get_collection(collection) else {
2718        return Vec::new();
2719    };
2720
2721    manager
2722        .get_many(ids)
2723        .into_iter()
2724        .flatten()
2725        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2726        .map(|record| {
2727            record
2728                .iter_fields()
2729                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2730                .collect()
2731        })
2732        .collect()
2733}
2734
2735fn ensure_update_target_contract(
2736    runtime: &RedDBRuntime,
2737    collection: &str,
2738    target: UpdateTarget,
2739) -> RedDBResult<()> {
2740    let Some(contract) = runtime.db().collection_contract(collection) else {
2741        return Ok(());
2742    };
2743    if update_target_contract_is_advisory(&contract)
2744        || update_target_allows_model(contract.declared_model, update_target_model(target))
2745    {
2746        return Ok(());
2747    }
2748    Err(RedDBError::InvalidOperation(format!(
2749        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2750        collection,
2751        update_model_name(contract.declared_model),
2752        update_model_name(update_target_model(target))
2753    )))
2754}
2755
2756fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2757    matches!(
2758        (&contract.origin, &contract.schema_mode),
2759        (
2760            crate::physical::ContractOrigin::Implicit,
2761            crate::catalog::SchemaMode::Dynamic,
2762        )
2763    )
2764}
2765
2766fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2767    match target {
2768        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2769        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2770        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2771        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2772    }
2773}
2774
2775fn update_target_allows_model(
2776    declared_model: crate::catalog::CollectionModel,
2777    requested_model: crate::catalog::CollectionModel,
2778) -> bool {
2779    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2780}
2781
2782fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2783    match model {
2784        crate::catalog::CollectionModel::Table => "table",
2785        crate::catalog::CollectionModel::Document => "document",
2786        crate::catalog::CollectionModel::Graph => "graph",
2787        crate::catalog::CollectionModel::Vector => "vector",
2788        crate::catalog::CollectionModel::Hll => "hll",
2789        crate::catalog::CollectionModel::Sketch => "sketch",
2790        crate::catalog::CollectionModel::Filter => "filter",
2791        crate::catalog::CollectionModel::Kv => "kv",
2792        crate::catalog::CollectionModel::Config => "config",
2793        crate::catalog::CollectionModel::Vault => "vault",
2794        crate::catalog::CollectionModel::Mixed => "mixed",
2795        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2796        crate::catalog::CollectionModel::Queue => "queue",
2797        crate::catalog::CollectionModel::Metrics => "metrics",
2798    }
2799}
2800
2801fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2802    let db = runtime.db();
2803    if let Some(contract) = db.collection_contract(collection) {
2804        let advisory_implicit_dynamic = matches!(
2805            (&contract.origin, &contract.schema_mode),
2806            (
2807                crate::physical::ContractOrigin::Implicit,
2808                crate::catalog::SchemaMode::Dynamic,
2809            )
2810        );
2811        if advisory_implicit_dynamic
2812            || matches!(
2813                contract.declared_model,
2814                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2815            )
2816        {
2817            return Ok(());
2818        }
2819        return Err(RedDBError::InvalidOperation(format!(
2820            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2821            collection, contract.declared_model
2822        )));
2823    }
2824
2825    let now = std::time::SystemTime::now()
2826        .duration_since(std::time::UNIX_EPOCH)
2827        .unwrap_or_default()
2828        .as_millis();
2829    db.save_collection_contract(crate::physical::CollectionContract {
2830        name: collection.to_string(),
2831        declared_model: crate::catalog::CollectionModel::Graph,
2832        schema_mode: crate::catalog::SchemaMode::Dynamic,
2833        origin: crate::physical::ContractOrigin::Implicit,
2834        version: 1,
2835        created_at_unix_ms: now,
2836        updated_at_unix_ms: now,
2837        default_ttl_ms: db.collection_default_ttl_ms(collection),
2838        vector_dimension: None,
2839        vector_metric: None,
2840        context_index_fields: Vec::new(),
2841        declared_columns: Vec::new(),
2842        table_def: None,
2843        timestamps_enabled: false,
2844        context_index_enabled: false,
2845        metrics_raw_retention_ms: None,
2846        metrics_rollup_policies: Vec::new(),
2847        metrics_tenant_identity: None,
2848        metrics_namespace: None,
2849        append_only: false,
2850        subscriptions: Vec::new(),
2851        analytics_config: Vec::new(),
2852        session_key: None,
2853        session_gap_ms: None,
2854        retention_duration_ms: None,
2855        analytical_storage: None,
2856    })
2857    .map(|_| ())
2858    .map_err(|err| RedDBError::Internal(err.to_string()))
2859}
2860
2861fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2862    query
2863        .assignment_exprs
2864        .iter()
2865        .enumerate()
2866        .any(|(idx, (column, expr))| {
2867            query
2868                .compound_assignment_ops
2869                .get(idx)
2870                .is_some_and(|op| op.is_some())
2871                || expr_references_update_column(expr, &query.table, column)
2872        })
2873}
2874
2875fn evaluate_compound_update_assignment(
2876    column: &str,
2877    record: &UnifiedRecord,
2878    op: BinOp,
2879    rhs: Value,
2880) -> RedDBResult<Value> {
2881    let lhs = record.get(column).ok_or_else(|| {
2882        RedDBError::Query(format!(
2883            "compound assignment requires existing numeric field '{column}'"
2884        ))
2885    })?;
2886    if matches!(lhs, Value::Null) {
2887        return Err(RedDBError::Query(format!(
2888            "compound assignment requires non-null numeric field '{column}'"
2889        )));
2890    }
2891    apply_compound_numeric_op(column, op, lhs, &rhs)
2892}
2893
2894fn apply_compound_numeric_op(
2895    column: &str,
2896    op: BinOp,
2897    lhs: &Value,
2898    rhs: &Value,
2899) -> RedDBResult<Value> {
2900    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2901        return Err(RedDBError::Query(format!(
2902            "compound assignment requires numeric field '{column}'"
2903        )));
2904    };
2905    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2906        return Err(RedDBError::Query(format!(
2907            "compound assignment requires numeric right-hand value for field '{column}'"
2908        )));
2909    };
2910
2911    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2912        let a = lhs_number.as_f64();
2913        let b = rhs_number.as_f64();
2914        let out = match op {
2915            BinOp::Add => a + b,
2916            BinOp::Sub => a - b,
2917            BinOp::Mul => a * b,
2918            BinOp::Div => {
2919                if b == 0.0 {
2920                    return Err(RedDBError::Query(format!(
2921                        "division by zero in compound assignment for field '{column}'"
2922                    )));
2923                }
2924                a / b
2925            }
2926            BinOp::Mod => {
2927                if b == 0.0 {
2928                    return Err(RedDBError::Query(format!(
2929                        "modulo by zero in compound assignment for field '{column}'"
2930                    )));
2931                }
2932                a % b
2933            }
2934            _ => {
2935                return Err(RedDBError::Query(format!(
2936                    "unsupported compound assignment operator for field '{column}'"
2937                )));
2938            }
2939        };
2940        if !out.is_finite() {
2941            return Err(RedDBError::Query(format!(
2942                "numeric overflow in compound assignment for field '{column}'"
2943            )));
2944        }
2945        return Ok(Value::Float(out));
2946    }
2947
2948    let a = lhs_number.as_i128();
2949    let b = rhs_number.as_i128();
2950    let out = match op {
2951        BinOp::Add => a.checked_add(b),
2952        BinOp::Sub => a.checked_sub(b),
2953        BinOp::Mul => a.checked_mul(b),
2954        BinOp::Mod => {
2955            if b == 0 {
2956                return Err(RedDBError::Query(format!(
2957                    "modulo by zero in compound assignment for field '{column}'"
2958                )));
2959            }
2960            a.checked_rem(b)
2961        }
2962        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2963        _ => None,
2964    }
2965    .ok_or_else(|| {
2966        RedDBError::Query(format!(
2967            "numeric overflow in compound assignment for field '{column}'"
2968        ))
2969    })?;
2970
2971    if matches!(lhs, Value::UnsignedInteger(_)) {
2972        let value = u64::try_from(out).map_err(|_| {
2973            RedDBError::Query(format!(
2974                "numeric overflow in compound assignment for field '{column}'"
2975            ))
2976        })?;
2977        Ok(Value::UnsignedInteger(value))
2978    } else {
2979        let value = i64::try_from(out).map_err(|_| {
2980            RedDBError::Query(format!(
2981                "numeric overflow in compound assignment for field '{column}'"
2982            ))
2983        })?;
2984        Ok(Value::Integer(value))
2985    }
2986}
2987
2988#[derive(Clone, Copy)]
2989enum CompoundNumber {
2990    Integer(i128),
2991    Float(f64),
2992}
2993
2994impl CompoundNumber {
2995    fn from_value(value: &Value) -> Option<Self> {
2996        match value {
2997            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2998            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2999            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
3000            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
3001            _ => None,
3002        }
3003    }
3004
3005    fn is_float(self) -> bool {
3006        matches!(self, Self::Float(_))
3007    }
3008
3009    fn as_f64(self) -> f64 {
3010        match self {
3011            Self::Integer(value) => value as f64,
3012            Self::Float(value) => value,
3013        }
3014    }
3015
3016    fn as_i128(self) -> i128 {
3017        match self {
3018            Self::Integer(value) => value,
3019            Self::Float(_) => unreachable!("float compound number used as integer"),
3020        }
3021    }
3022}
3023
3024fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3025    match expr {
3026        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3027        Expr::Column { field, .. } => {
3028            field_ref_matches_update_column(field, table_name, target_column)
3029        }
3030        Expr::BinaryOp { lhs, rhs, .. } => {
3031            expr_references_update_column(lhs, table_name, target_column)
3032                || expr_references_update_column(rhs, table_name, target_column)
3033        }
3034        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3035            expr_references_update_column(operand, table_name, target_column)
3036        }
3037        Expr::FunctionCall { args, .. } => args
3038            .iter()
3039            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3040        Expr::Case {
3041            branches, else_, ..
3042        } => {
3043            branches.iter().any(|(cond, value)| {
3044                expr_references_update_column(cond, table_name, target_column)
3045                    || expr_references_update_column(value, table_name, target_column)
3046            }) || else_
3047                .as_deref()
3048                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3049        }
3050        Expr::IsNull { operand, .. } => {
3051            expr_references_update_column(operand, table_name, target_column)
3052        }
3053        Expr::InList { target, values, .. } => {
3054            expr_references_update_column(target, table_name, target_column)
3055                || values
3056                    .iter()
3057                    .any(|value| expr_references_update_column(value, table_name, target_column))
3058        }
3059        Expr::Between {
3060            target, low, high, ..
3061        } => {
3062            expr_references_update_column(target, table_name, target_column)
3063                || expr_references_update_column(low, table_name, target_column)
3064                || expr_references_update_column(high, table_name, target_column)
3065        }
3066        Expr::WindowFunctionCall { args, window, .. } => {
3067            args.iter()
3068                .any(|arg| expr_references_update_column(arg, table_name, target_column))
3069                || window
3070                    .partition_by
3071                    .iter()
3072                    .any(|e| expr_references_update_column(e, table_name, target_column))
3073                || window
3074                    .order_by
3075                    .iter()
3076                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3077        }
3078    }
3079}
3080
3081fn field_ref_matches_update_column(
3082    field: &FieldRef,
3083    table_name: &str,
3084    target_column: &str,
3085) -> bool {
3086    match field {
3087        FieldRef::TableColumn { table, column } => {
3088            column.eq_ignore_ascii_case(target_column)
3089                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3090        }
3091        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3092            false
3093        }
3094    }
3095}
3096
3097fn resolve_update_entity_by_logical_id(
3098    runtime: &RedDBRuntime,
3099    table: &str,
3100    logical_id: EntityId,
3101) -> Option<UnifiedEntity> {
3102    let store = runtime.inner.db.store();
3103    if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3104        return Some(entity);
3105    }
3106    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
3107    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
3108    store.get(table, logical_id)
3109}
3110
3111fn update_cdc_item_kind(
3112    runtime: &RedDBRuntime,
3113    collection: &str,
3114    entity: &UnifiedEntity,
3115) -> &'static str {
3116    match &entity.data {
3117        EntityData::Node(_) => return "node",
3118        EntityData::Edge(_) => return "edge",
3119        _ => {}
3120    }
3121
3122    match runtime
3123        .db()
3124        .collection_contract(collection)
3125        .map(|contract| contract.declared_model)
3126    {
3127        Some(crate::catalog::CollectionModel::Document) => "document",
3128        Some(crate::catalog::CollectionModel::Kv)
3129        | Some(crate::catalog::CollectionModel::Vault) => "kv",
3130        _ => "row",
3131    }
3132}
3133
3134fn ordered_update_target_ids(
3135    manager: &Arc<crate::storage::SegmentManager>,
3136    entity_ids: &[EntityId],
3137    order_by: &[OrderByClause],
3138    limit: Option<usize>,
3139) -> Vec<EntityId> {
3140    let mut entities: Vec<UnifiedEntity> =
3141        manager.get_many(entity_ids).into_iter().flatten().collect();
3142    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3143    if let Some(limit) = limit {
3144        entities.truncate(limit);
3145    }
3146    entities.into_iter().map(|entity| entity.id).collect()
3147}
3148
3149fn compare_update_order(
3150    left: &UnifiedEntity,
3151    right: &UnifiedEntity,
3152    order_by: &[OrderByClause],
3153) -> Ordering {
3154    for clause in order_by {
3155        let left_value = update_order_value(left, &clause.field);
3156        let right_value = update_order_value(right, &clause.field);
3157        let ordering = compare_update_order_values(
3158            left_value.as_ref(),
3159            right_value.as_ref(),
3160            clause.nulls_first,
3161        );
3162        if ordering != Ordering::Equal {
3163            return if clause.ascending {
3164                ordering
3165            } else {
3166                ordering.reverse()
3167            };
3168        }
3169    }
3170    left.logical_id().raw().cmp(&right.logical_id().raw())
3171}
3172
3173fn compare_update_order_values(
3174    left: Option<&Value>,
3175    right: Option<&Value>,
3176    nulls_first: bool,
3177) -> Ordering {
3178    match (left, right) {
3179        (None, None) => Ordering::Equal,
3180        (None, Some(_)) => {
3181            if nulls_first {
3182                Ordering::Less
3183            } else {
3184                Ordering::Greater
3185            }
3186        }
3187        (Some(_), None) => {
3188            if nulls_first {
3189                Ordering::Greater
3190            } else {
3191                Ordering::Less
3192            }
3193        }
3194        (Some(left), Some(right)) => {
3195            crate::storage::query::value_compare::total_compare_values(left, right)
3196        }
3197    }
3198}
3199
3200fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3201    let FieldRef::TableColumn { table, column } = field else {
3202        return None;
3203    };
3204    if !table.is_empty() {
3205        return None;
3206    }
3207    if column.eq_ignore_ascii_case("rid") {
3208        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3209    }
3210    match &entity.data {
3211        EntityData::Row(row) => row.get_field(column).cloned(),
3212        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3213            .and_then(|record| record.get(column).cloned()),
3214        _ => None,
3215    }
3216}
3217
3218fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3219    if columns.is_empty() {
3220        return columns;
3221    }
3222
3223    let mut unique = Vec::with_capacity(columns.len());
3224    for column in columns.drain(..) {
3225        if !unique
3226            .iter()
3227            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3228        {
3229            unique.push(column);
3230        }
3231    }
3232    unique
3233}
3234
3235// =============================================================================
3236// Helper functions for extracting typed values from column/value pairs
3237// =============================================================================
3238
3239const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3240
3241fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3242    if column.eq_ignore_ascii_case("_ttl") {
3243        Some(SQL_TTL_METADATA_COLUMNS[0])
3244    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3245        Some(SQL_TTL_METADATA_COLUMNS[1])
3246    } else if column.eq_ignore_ascii_case("_expires_at") {
3247        Some(SQL_TTL_METADATA_COLUMNS[2])
3248    } else {
3249        None
3250    }
3251}
3252
3253/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3254/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3255/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3256/// `_ttl_ms` and `_expires_at` are passed through.
3257fn canonicalize_sql_ttl_metadata(
3258    key: &'static str,
3259    value: MetadataValue,
3260) -> (&'static str, MetadataValue) {
3261    if key != "_ttl" {
3262        return (key, value);
3263    }
3264    let scaled = match value {
3265        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3266        MetadataValue::Timestamp(ms_or_s) => {
3267            // Timestamp is already chosen for very large values; treat as
3268            // already-ms to avoid silent overflow.
3269            MetadataValue::Timestamp(ms_or_s)
3270        }
3271        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3272        other => other,
3273    };
3274    ("_ttl_ms", scaled)
3275}
3276
3277/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3278/// `SECRET('...')` literals. The runtime strips this marker and
3279/// applies the actual crypto transform during INSERT execution.
3280pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3281
3282impl RedDBRuntime {
3283    /// Strip the plaintext sentinel from a `Value::Password` or
3284    /// `Value::Secret` produced by the parser and apply the real
3285    /// crypto transform. `Password` is always hashed with argon2id.
3286    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3287    /// when `red.config.secret.auto_encrypt = true` (default).
3288    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3289        match value {
3290            Value::Password(marked) => {
3291                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3292                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3293                } else {
3294                    Ok(Value::Password(marked))
3295                }
3296            }
3297            Value::Secret(bytes) => {
3298                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3299                    if !self.secret_auto_encrypt() {
3300                        return Err(RedDBError::Query(
3301                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3302                             is false. Insert pre-encrypted bytes directly instead."
3303                                .to_string(),
3304                        ));
3305                    }
3306                    let key = self.secret_aes_key().ok_or_else(|| {
3307                        RedDBError::Query(
3308                            "SECRET() column encryption requires a bootstrapped \
3309                             vault (red.secret.aes_key is missing). Start the server \
3310                             with --vault to enable."
3311                                .to_string(),
3312                        )
3313                    })?;
3314                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3315                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3316                } else {
3317                    Ok(Value::Secret(bytes))
3318                }
3319            }
3320            other => Ok(other),
3321        }
3322    }
3323}
3324
3325/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3326/// This is the on-disk representation of `Value::Secret`.
3327fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3328    let nonce_bytes = crate::auth::store::random_bytes(12);
3329    let mut nonce = [0u8; 12];
3330    nonce.copy_from_slice(&nonce_bytes[..12]);
3331    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3332    let mut out = Vec::with_capacity(12 + ct.len());
3333    out.extend_from_slice(&nonce);
3334    out.extend_from_slice(&ct);
3335    out
3336}
3337
3338/// Decode a `Value::Secret` payload back to plaintext. Returns
3339/// `None` when the payload is too short or AES-GCM authentication
3340/// fails (tampered or wrong key).
3341pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3342    if payload.len() < 12 {
3343        return None;
3344    }
3345    let mut nonce = [0u8; 12];
3346    nonce.copy_from_slice(&payload[..12]);
3347    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3348}
3349
3350fn split_insert_metadata(
3351    runtime: &RedDBRuntime,
3352    columns: &[String],
3353    values: &[Value],
3354) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3355    let mut fields = Vec::new();
3356    let mut metadata = Vec::new();
3357
3358    for (column, value) in columns.iter().zip(values.iter()) {
3359        // Still support legacy _ttl columns for backward compat
3360        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3361            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3362            let (canonical_key, canonical_value) =
3363                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3364            metadata.push((canonical_key.to_string(), canonical_value));
3365            continue;
3366        }
3367        fields.push((
3368            column.clone(),
3369            runtime.resolve_crypto_sentinel(value.clone())?,
3370        ));
3371    }
3372
3373    Ok((fields, metadata))
3374}
3375
3376/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3377fn merge_with_clauses(
3378    metadata: &mut Vec<(String, MetadataValue)>,
3379    ttl_ms: Option<u64>,
3380    expires_at_ms: Option<u64>,
3381    with_metadata: &[(String, Value)],
3382) {
3383    if let Some(ms) = ttl_ms {
3384        metadata.push((
3385            "_ttl_ms".to_string(),
3386            if ms <= i64::MAX as u64 {
3387                MetadataValue::Int(ms as i64)
3388            } else {
3389                MetadataValue::Timestamp(ms)
3390            },
3391        ));
3392    }
3393    if let Some(ms) = expires_at_ms {
3394        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3395    }
3396    for (key, value) in with_metadata {
3397        let meta_value = match value {
3398            Value::Text(s) => MetadataValue::String(s.to_string()),
3399            Value::Integer(n) => MetadataValue::Int(*n),
3400            Value::Float(n) => MetadataValue::Float(*n),
3401            Value::Boolean(b) => MetadataValue::Bool(*b),
3402            _ => MetadataValue::String(value.to_string()),
3403        };
3404        metadata.push((key.clone(), meta_value));
3405    }
3406}
3407
3408fn merge_vector_metadata_column(
3409    metadata: &mut Vec<(String, MetadataValue)>,
3410    columns: &[String],
3411    values: &[Value],
3412) -> RedDBResult<()> {
3413    let Some(value) = columns
3414        .iter()
3415        .position(|column| column.eq_ignore_ascii_case("metadata"))
3416        .map(|index| &values[index])
3417    else {
3418        return Ok(());
3419    };
3420    let json = match value {
3421        Value::Null => return Ok(()),
3422        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3423            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3424        })?,
3425        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3426            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3427        })?,
3428        other => {
3429            return Err(RedDBError::Query(format!(
3430                "column 'metadata' expected JSON object, got {other:?}"
3431            )))
3432        }
3433    };
3434    let parsed = metadata_from_json(&json)?;
3435    for (key, value) in parsed.iter() {
3436        metadata.push((key.clone(), value.clone()));
3437    }
3438    Ok(())
3439}
3440
3441fn apply_collection_default_ttl_metadata(
3442    runtime: &RedDBRuntime,
3443    collection: &str,
3444    metadata: &mut Vec<(String, MetadataValue)>,
3445) {
3446    if has_internal_ttl_metadata(metadata) {
3447        return;
3448    }
3449
3450    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3451        return;
3452    };
3453
3454    metadata.push((
3455        "_ttl_ms".to_string(),
3456        if default_ttl_ms <= i64::MAX as u64 {
3457            MetadataValue::Int(default_ttl_ms as i64)
3458        } else {
3459            MetadataValue::Timestamp(default_ttl_ms)
3460        },
3461    ));
3462}
3463
3464fn ensure_non_tree_reserved_metadata_entries(
3465    metadata: &[(String, MetadataValue)],
3466) -> RedDBResult<()> {
3467    for (key, _) in metadata {
3468        ensure_non_tree_reserved_metadata_key(key)?;
3469    }
3470    Ok(())
3471}
3472
3473fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3474    if key.starts_with(TREE_METADATA_PREFIX) {
3475        return Err(RedDBError::Query(format!(
3476            "metadata key '{}' is reserved for managed trees",
3477            key
3478        )));
3479    }
3480    Ok(())
3481}
3482
3483fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3484    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3485        return Err(RedDBError::Query(format!(
3486            "edge label '{}' is reserved for managed trees",
3487            TREE_CHILD_EDGE_LABEL
3488        )));
3489    }
3490    Ok(())
3491}
3492
3493fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3494    let mut columns = Vec::with_capacity(pairs.len());
3495    let mut values = Vec::with_capacity(pairs.len());
3496
3497    for (column, value) in pairs {
3498        columns.push(column.clone());
3499        values.push(value.clone());
3500    }
3501
3502    (columns, values)
3503}
3504
3505/// Find a required column value and return it as-is.
3506fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3507    for (i, col) in columns.iter().enumerate() {
3508        if col.eq_ignore_ascii_case(name) {
3509            return Ok(values[i].clone());
3510        }
3511    }
3512    Err(RedDBError::Query(format!(
3513        "required column '{name}' not found in INSERT"
3514    )))
3515}
3516
3517/// Find a required column value and coerce to String.
3518fn find_column_value_string(
3519    columns: &[String],
3520    values: &[Value],
3521    name: &str,
3522) -> RedDBResult<String> {
3523    let val = find_column_value(columns, values, name)?;
3524    match val {
3525        Value::Text(s) => Ok(s.to_string()),
3526        Value::Integer(n) => Ok(n.to_string()),
3527        Value::Float(n) => Ok(n.to_string()),
3528        other => Err(RedDBError::Query(format!(
3529            "column '{name}' expected text, got {other:?}"
3530        ))),
3531    }
3532}
3533
3534fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3535    let val = find_column_value(columns, values, name)?;
3536    match val {
3537        Value::Float(n) => Ok(n),
3538        Value::Integer(n) => Ok(n as f64),
3539        Value::UnsignedInteger(n) => Ok(n as f64),
3540        Value::Text(s) => s
3541            .parse::<f64>()
3542            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3543        other => Err(RedDBError::Query(format!(
3544            "column '{name}' expected number, got {other:?}"
3545        ))),
3546    }
3547}
3548
3549/// Find an optional column value as String.
3550fn find_column_value_opt_string(
3551    columns: &[String],
3552    values: &[Value],
3553    name: &str,
3554) -> Option<String> {
3555    for (i, col) in columns.iter().enumerate() {
3556        if col.eq_ignore_ascii_case(name) {
3557            return match &values[i] {
3558                Value::Null => None,
3559                Value::Text(s) => Some(s.to_string()),
3560                Value::Integer(n) => Some(n.to_string()),
3561                Value::Float(n) => Some(n.to_string()),
3562                _ => None,
3563            };
3564        }
3565    }
3566    None
3567}
3568
3569/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3570///
3571/// Accepts integer literals, decimal strings, and node labels resolved via
3572/// the per-collection graph label index (same source of truth that
3573/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3574/// labels error so callers can fall back to the numeric id form.
3575fn resolve_edge_endpoint(
3576    store: &crate::storage::unified::UnifiedStore,
3577    collection: &str,
3578    columns: &[String],
3579    values: &[Value],
3580    name: &str,
3581) -> RedDBResult<u64> {
3582    let val = find_column_value(columns, values, name)?;
3583    match val {
3584        Value::Integer(n) => Ok(n as u64),
3585        Value::UnsignedInteger(n) => Ok(n),
3586        Value::Text(s) => {
3587            if let Ok(n) = s.parse::<u64>() {
3588                return Ok(n);
3589            }
3590            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3591            match matches.len() {
3592                0 => Err(RedDBError::Query(format!(
3593                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3594                ))),
3595                1 => Ok(matches[0].raw()),
3596                n => Err(RedDBError::Query(format!(
3597                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3598                ))),
3599            }
3600        }
3601        other => Err(RedDBError::Query(format!(
3602            "column '{name}' expected integer or node label, got {other:?}"
3603        ))),
3604    }
3605}
3606
3607fn resolve_edge_endpoint_any(
3608    store: &crate::storage::unified::UnifiedStore,
3609    collection: &str,
3610    columns: &[String],
3611    values: &[Value],
3612    names: &[&str],
3613) -> RedDBResult<u64> {
3614    for name in names {
3615        if columns
3616            .iter()
3617            .any(|column| column.eq_ignore_ascii_case(name))
3618        {
3619            return resolve_edge_endpoint(store, collection, columns, values, name);
3620        }
3621    }
3622
3623    Err(RedDBError::Query(format!(
3624        "required column '{}' not found in INSERT",
3625        names.first().copied().unwrap_or("from_rid")
3626    )))
3627}
3628
3629/// Find a required column value and coerce to u64.
3630fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3631    let val = find_column_value(columns, values, name)?;
3632    match val {
3633        Value::Integer(n) => Ok(n as u64),
3634        Value::UnsignedInteger(n) => Ok(n),
3635        Value::Text(s) => s
3636            .parse::<u64>()
3637            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3638        other => Err(RedDBError::Query(format!(
3639            "column '{name}' expected integer, got {other:?}"
3640        ))),
3641    }
3642}
3643
3644/// Find an optional column value as f32.
3645fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3646    for (i, col) in columns.iter().enumerate() {
3647        if col.eq_ignore_ascii_case(name) {
3648            return match &values[i] {
3649                Value::Float(n) => Some(*n as f32),
3650                Value::Integer(n) => Some(*n as f32),
3651                Value::Null => None,
3652                _ => None,
3653            };
3654        }
3655    }
3656    None
3657}
3658
3659/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3660fn find_column_value_vec_f32(
3661    columns: &[String],
3662    values: &[Value],
3663    name: &str,
3664) -> RedDBResult<Vec<f32>> {
3665    let val = find_column_value(columns, values, name)?;
3666    match val {
3667        Value::Vector(v) => Ok(v),
3668        Value::Json(bytes) => {
3669            // Try to parse as JSON array of numbers
3670            let s = std::str::from_utf8(&bytes).map_err(|_| {
3671                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3672            })?;
3673            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3674                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3675            })?;
3676            Ok(arr)
3677        }
3678        other => Err(RedDBError::Query(format!(
3679            "column '{name}' expected vector, got {other:?}"
3680        ))),
3681    }
3682}
3683
3684fn find_column_value_vec_f32_any(
3685    columns: &[String],
3686    values: &[Value],
3687    names: &[&str],
3688) -> RedDBResult<Vec<f32>> {
3689    for name in names {
3690        if columns
3691            .iter()
3692            .any(|column| column.eq_ignore_ascii_case(name))
3693        {
3694            return find_column_value_vec_f32(columns, values, name);
3695        }
3696    }
3697    Err(RedDBError::Query(format!(
3698        "required vector column '{}' not found in INSERT",
3699        names.join("' or '")
3700    )))
3701}
3702
3703/// Extract remaining properties (all columns not in the exclusion list).
3704fn extract_remaining_properties(
3705    columns: &[String],
3706    values: &[Value],
3707    exclude: &[&str],
3708) -> Vec<(String, Value)> {
3709    columns
3710        .iter()
3711        .zip(values.iter())
3712        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3713        .map(|(col, val)| (col.clone(), val.clone()))
3714        .collect()
3715}
3716
3717fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3718    let mut invalid = Vec::new();
3719    for column in columns {
3720        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3721            invalid.push(column.clone());
3722        }
3723    }
3724
3725    if invalid.is_empty() {
3726        Ok(())
3727    } else {
3728        Err(RedDBError::Query(format!(
3729            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3730            invalid.join(", ")
3731        )))
3732    }
3733}
3734
3735fn is_timeseries_insert_column(column: &str) -> bool {
3736    matches!(
3737        column.to_ascii_lowercase().as_str(),
3738        "metric"
3739            | "value"
3740            | "tags"
3741            | "timestamp"
3742            | "timestamp_ns"
3743            | "time"
3744            // Analytics-event extension (#577): an analytics row carries
3745            // an `event_name` + JSON `payload`. The payload is validated
3746            // against the AnalyticsSchemaRegistry inside
3747            // `insert_timeseries_point` before the row lands.
3748            | "event_name"
3749            | "payload"
3750    )
3751}
3752
3753fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3754    let mut found = None;
3755
3756    for alias in ["timestamp_ns", "timestamp", "time"] {
3757        for (index, column) in columns.iter().enumerate() {
3758            if !column.eq_ignore_ascii_case(alias) {
3759                continue;
3760            }
3761
3762            if found.is_some() {
3763                return Err(RedDBError::Query(
3764                    "timeseries INSERT accepts only one timestamp column".to_string(),
3765                ));
3766            }
3767
3768            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3769        }
3770    }
3771
3772    Ok(found)
3773}
3774
3775fn find_timeseries_tags(
3776    columns: &[String],
3777    values: &[Value],
3778) -> RedDBResult<std::collections::HashMap<String, String>> {
3779    for (index, column) in columns.iter().enumerate() {
3780        if column.eq_ignore_ascii_case("tags") {
3781            return parse_timeseries_tags(&values[index]);
3782        }
3783    }
3784    Ok(std::collections::HashMap::new())
3785}
3786
3787fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3788    match value {
3789        Value::Null => Ok(std::collections::HashMap::new()),
3790        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3791        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3792        other => Err(RedDBError::Query(format!(
3793            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3794        ))),
3795    }
3796}
3797
3798fn parse_timeseries_tags_json(
3799    bytes: &[u8],
3800) -> RedDBResult<std::collections::HashMap<String, String>> {
3801    let json: crate::json::Value = crate::json::from_slice(bytes)
3802        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3803
3804    let object = match json {
3805        crate::json::Value::Object(object) => object,
3806        other => {
3807            return Err(RedDBError::Query(format!(
3808                "timeseries tags must be a JSON object, got {other:?}"
3809            )))
3810        }
3811    };
3812
3813    let mut tags = std::collections::HashMap::with_capacity(object.len());
3814    for (key, value) in object {
3815        tags.insert(key, json_tag_value_to_string(&value));
3816    }
3817    Ok(tags)
3818}
3819
3820/// Encode a tag value for storage so the original JSON type can be
3821/// recovered on read (issue #543).
3822///
3823/// Time-series tags are stored as `HashMap<String, String>` on the
3824/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3825/// the segment codec, WAL and gRPC mirrors don't need a new value
3826/// variant. To preserve the original JSON type across that
3827/// string-only channel we prepend the
3828/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3829/// and serialize the value as compact JSON text. The read paths
3830/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3831/// the marker, parse the suffix, and recover a real JSON value.
3832/// Tags written through other channels (Prometheus remote write,
3833/// metrics handlers, legacy on-disk data) lack the marker and are
3834/// returned as `JsonValue::String(raw)` exactly as before.
3835fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3836    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3837    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3838    buf.push_str(&value.to_string_compact());
3839    buf
3840}
3841
3842fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3843    match value {
3844        Value::UnsignedInteger(value) => Ok(*value),
3845        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3846        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3847        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3848            RedDBError::Query(format!(
3849                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3850            ))
3851        }),
3852        other => Err(RedDBError::Query(format!(
3853            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3854        ))),
3855    }
3856}
3857
3858fn current_unix_ns() -> u64 {
3859    std::time::SystemTime::now()
3860        .duration_since(std::time::UNIX_EPOCH)
3861        .unwrap_or_default()
3862        .as_nanos()
3863        .min(u128::from(u64::MAX)) as u64
3864}
3865
3866fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3867    use crate::json::{Map, Value as JV};
3868    match value {
3869        MetadataValue::Null => JV::Null,
3870        MetadataValue::Bool(value) => JV::Bool(*value),
3871        MetadataValue::Int(value) => JV::Number(*value as f64),
3872        MetadataValue::Float(value) => JV::Number(*value),
3873        MetadataValue::String(value) => JV::String(value.clone()),
3874        MetadataValue::Bytes(value) => JV::Array(
3875            value
3876                .iter()
3877                .map(|value| JV::Number(*value as f64))
3878                .collect(),
3879        ),
3880        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3881        MetadataValue::Array(values) => {
3882            JV::Array(values.iter().map(metadata_value_to_json).collect())
3883        }
3884        MetadataValue::Object(object) => {
3885            let entries = object
3886                .iter()
3887                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3888                .collect();
3889            JV::Object(entries)
3890        }
3891        MetadataValue::Geo { lat, lon } => {
3892            let mut object = Map::new();
3893            object.insert("lat".to_string(), JV::Number(*lat));
3894            object.insert("lon".to_string(), JV::Number(*lon));
3895            JV::Object(object)
3896        }
3897        MetadataValue::Reference(target) => {
3898            let mut object = Map::new();
3899            object.insert(
3900                "collection".to_string(),
3901                JV::String(target.collection().to_string()),
3902            );
3903            object.insert(
3904                "entity_id".to_string(),
3905                JV::Number(target.entity_id().raw() as f64),
3906            );
3907            JV::Object(object)
3908        }
3909        MetadataValue::References(values) => {
3910            let refs = values
3911                .iter()
3912                .map(|target| {
3913                    let mut object = Map::new();
3914                    object.insert(
3915                        "collection".to_string(),
3916                        JV::String(target.collection().to_string()),
3917                    );
3918                    object.insert(
3919                        "entity_id".to_string(),
3920                        JV::Number(target.entity_id().raw() as f64),
3921                    );
3922                    JV::Object(object)
3923                })
3924                .collect();
3925            JV::Array(refs)
3926        }
3927    }
3928}
3929
3930fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3931    match value {
3932        Value::Null => MetadataValue::Null,
3933        Value::Boolean(value) => MetadataValue::Bool(*value),
3934        Value::Integer(value) => MetadataValue::Int(*value),
3935        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3936        Value::Float(value) => MetadataValue::Float(*value),
3937        Value::Text(value) => MetadataValue::String(value.to_string()),
3938        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3939        Value::Timestamp(value) => {
3940            if *value >= 0 {
3941                metadata_u64_to_value(*value as u64)
3942            } else {
3943                MetadataValue::Int(*value)
3944            }
3945        }
3946        Value::TimestampMs(value) => {
3947            if *value >= 0 {
3948                metadata_u64_to_value(*value as u64)
3949            } else {
3950                MetadataValue::Int(*value)
3951            }
3952        }
3953        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3954        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3955        Value::Date(value) => MetadataValue::String(value.to_string()),
3956        Value::Time(value) => MetadataValue::String(value.to_string()),
3957        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3958        Value::Ipv4(value) => MetadataValue::String(format!(
3959            "{}.{}.{}.{}",
3960            (value >> 24) & 0xFF,
3961            (value >> 16) & 0xFF,
3962            (value >> 8) & 0xFF,
3963            value & 0xFF
3964        )),
3965        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3966        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3967        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3968        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3969            lat: *lat as f64 / 1_000_000.0,
3970            lon: *lon as f64 / 1_000_000.0,
3971        },
3972        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3973        Value::TableRef(value) => MetadataValue::String(value.clone()),
3974        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3975        Value::Password(value) => MetadataValue::String(value.clone()),
3976        Value::Array(values) => {
3977            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3978        }
3979        _ => MetadataValue::String(value.to_string()),
3980    }
3981}
3982
3983fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3984    match value {
3985        Value::Null => Ok(MetadataValue::Null),
3986        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3987        Value::Integer(_) => Err(RedDBError::Query(format!(
3988            "column '{field}' must be non-negative for TTL metadata"
3989        ))),
3990        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3991        Value::Float(value) if value.is_finite() => {
3992            if value.fract().abs() >= f64::EPSILON {
3993                return Err(RedDBError::Query(format!(
3994                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3995                )));
3996            }
3997            if *value < 0.0 {
3998                return Err(RedDBError::Query(format!(
3999                    "column '{field}' must be non-negative for TTL metadata"
4000                )));
4001            }
4002            if *value > u64::MAX as f64 {
4003                return Err(RedDBError::Query(format!(
4004                    "column '{field}' value is too large"
4005                )));
4006            }
4007            Ok(metadata_u64_to_value(*value as u64))
4008        }
4009        Value::Float(_) => Err(RedDBError::Query(format!(
4010            "column '{field}' must be a finite number"
4011        ))),
4012        Value::Text(value) => {
4013            let value = value.trim();
4014            if let Ok(value) = value.parse::<u64>() {
4015                Ok(metadata_u64_to_value(value))
4016            } else if let Ok(value) = value.parse::<i64>() {
4017                if value < 0 {
4018                    return Err(RedDBError::Query(format!(
4019                        "column '{field}' must be non-negative for TTL metadata"
4020                    )));
4021                }
4022                Ok(metadata_u64_to_value(value as u64))
4023            } else if let Ok(value) = value.parse::<f64>() {
4024                if !value.is_finite() {
4025                    return Err(RedDBError::Query(format!(
4026                        "column '{field}' must be a finite number"
4027                    )));
4028                }
4029                if value.fract().abs() >= f64::EPSILON {
4030                    return Err(RedDBError::Query(format!(
4031                        "column '{field}' must be an integer (TTL metadata must be an integer)"
4032                    )));
4033                }
4034                if value < 0.0 {
4035                    return Err(RedDBError::Query(format!(
4036                        "column '{field}' must be non-negative for TTL metadata"
4037                    )));
4038                }
4039                if value > u64::MAX as f64 {
4040                    return Err(RedDBError::Query(format!(
4041                        "column '{field}' value is too large"
4042                    )));
4043                }
4044                Ok(metadata_u64_to_value(value as u64))
4045            } else {
4046                Err(RedDBError::Query(format!(
4047                    "column '{field}' expects a numeric value for TTL metadata"
4048                )))
4049            }
4050        }
4051        _ => Err(RedDBError::Query(format!(
4052            "column '{field}' expects a numeric value for TTL metadata"
4053        ))),
4054    }
4055}
4056
4057fn metadata_u64_to_value(value: u64) -> MetadataValue {
4058    if value <= i64::MAX as u64 {
4059        MetadataValue::Int(value as i64)
4060    } else {
4061        MetadataValue::Timestamp(value)
4062    }
4063}
4064
4065/// Phase 2 PG parity: inspect a column value and return `true` when
4066/// the dotted `tail` path is already present under it. Used by the
4067/// tenant auto-fill so rows that already carry an explicit value
4068/// (bulk import, admin insert on behalf of a tenant) are not
4069/// double-stamped with the session's current_tenant().
4070fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4071    let json = match value {
4072        Value::Null => return false,
4073        Value::Json(bytes) | Value::Blob(bytes) => {
4074            match crate::json::from_slice::<crate::json::Value>(bytes) {
4075                Ok(v) => v,
4076                Err(_) => return false,
4077            }
4078        }
4079        Value::Text(s) => {
4080            let trimmed = s.trim_start();
4081            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4082                return false;
4083            }
4084            match crate::json::from_str::<crate::json::Value>(s) {
4085                Ok(v) => v,
4086                Err(_) => return false,
4087            }
4088        }
4089        _ => return false,
4090    };
4091    let mut cursor = &json;
4092    for seg in tail.split('.') {
4093        match cursor {
4094            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4095                Some((_, v)) => cursor = v,
4096                None => return false,
4097            },
4098            _ => return false,
4099        }
4100    }
4101    !matches!(cursor, crate::json::Value::Null)
4102}
4103
4104/// Phase 2 PG parity: take a column value (possibly Null / Text /
4105/// Json) and return a `Value::Json` with the dotted `tail` path set
4106/// to `tenant_id`. Preserves every pre-existing key.
4107///
4108/// Accepts:
4109/// * `Value::Null`  → fresh `{tail: tenant_id}` object
4110/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
4111/// * `Value::text(s)` if `s` is valid JSON → same as Json
4112/// * anything else → error (user supplied a scalar where we need
4113///   a JSON container)
4114fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4115    let mut root = match current {
4116        Value::Null => crate::json::Value::Object(Default::default()),
4117        Value::Json(bytes) | Value::Blob(bytes) => {
4118            crate::json::from_slice(&bytes).map_err(|err| {
4119                RedDBError::Query(format!(
4120                    "tenant auto-fill: root column is not valid JSON ({err})"
4121                ))
4122            })?
4123        }
4124        Value::Text(s) => {
4125            if s.trim().is_empty() {
4126                crate::json::Value::Object(Default::default())
4127            } else {
4128                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4129                    RedDBError::Query(format!(
4130                        "tenant auto-fill: text root is not valid JSON ({err})"
4131                    ))
4132                })?
4133            }
4134        }
4135        other => {
4136            return Err(RedDBError::Query(format!(
4137                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4138            )));
4139        }
4140    };
4141
4142    // Navigate path segments, creating intermediate objects on demand.
4143    let segments: Vec<&str> = tail.split('.').collect();
4144    let mut cursor: &mut crate::json::Value = &mut root;
4145    for (i, seg) in segments.iter().enumerate() {
4146        let is_last = i + 1 == segments.len();
4147        let map = match cursor {
4148            crate::json::Value::Object(m) => m,
4149            _ => {
4150                return Err(RedDBError::Query(format!(
4151                    "tenant auto-fill: segment '{seg}' is not inside an object"
4152                )));
4153            }
4154        };
4155        if is_last {
4156            map.insert(
4157                seg.to_string(),
4158                crate::json::Value::String(tenant_id.to_string()),
4159            );
4160            break;
4161        }
4162        cursor = map
4163            .entry(seg.to_string())
4164            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4165    }
4166
4167    let bytes = crate::json::to_vec(&root).map_err(|err| {
4168        RedDBError::Query(format!(
4169            "tenant auto-fill: failed to re-serialize JSON ({err})"
4170        ))
4171    })?;
4172    Ok(Value::Json(bytes))
4173}
4174
4175#[cfg(test)]
4176mod tests {
4177    use crate::storage::schema::Value;
4178    use crate::storage::wal::{WalReader, WalRecord};
4179    use crate::{RedDBOptions, RedDBRuntime};
4180    use std::path::Path;
4181
4182    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4183        WalReader::open(wal_path)
4184            .expect("wal opens")
4185            .iter()
4186            .map(|record| record.expect("wal record decodes").1)
4187            .filter_map(|record| match record {
4188                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4189                _ => None,
4190            })
4191            .collect()
4192    }
4193
4194    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4195        action
4196            .windows(needle.len())
4197            .any(|window| window == needle.as_bytes())
4198    }
4199
4200    fn assert_statement_writes_collections_in_one_new_wal_batch(
4201        rt: &RedDBRuntime,
4202        wal_path: &Path,
4203        statement: &str,
4204        source: &str,
4205        event_queue: &str,
4206    ) {
4207        let before_batches = store_commit_batches(wal_path).len();
4208
4209        rt.execute_query(statement).unwrap();
4210
4211        let batches = store_commit_batches(wal_path);
4212        let statement_batches = &batches[before_batches..];
4213        let source_batch = statement_batches
4214            .iter()
4215            .position(|actions| {
4216                actions.iter().any(|action| {
4217                    action_contains_text(action, source)
4218                        && !action_contains_text(action, event_queue)
4219                })
4220            })
4221            .expect("source collection write batch is present");
4222        let event_batch = statement_batches
4223            .iter()
4224            .position(|actions| {
4225                actions
4226                    .iter()
4227                    .any(|action| action_contains_text(action, event_queue))
4228            })
4229            .expect("event queue write batch is present");
4230
4231        assert_eq!(
4232            source_batch, event_batch,
4233            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4234        );
4235    }
4236
4237    #[test]
4238    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4239        let dir = tempfile::tempdir().unwrap();
4240        let db_path = dir.path().join("events_dual_write.rdb");
4241        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4242        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4243
4244        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4245            .unwrap();
4246        assert_statement_writes_collections_in_one_new_wal_batch(
4247            &rt,
4248            &wal_path,
4249            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4250            "users",
4251            "users_events",
4252        );
4253    }
4254
4255    #[test]
4256    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4257        let dir = tempfile::tempdir().unwrap();
4258        let db_path = dir.path().join("events_update_atomic.rdb");
4259        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4260        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4261
4262        rt.execute_query(
4263            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4264        )
4265        .unwrap();
4266        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4267            .unwrap();
4268
4269        assert_statement_writes_collections_in_one_new_wal_batch(
4270            &rt,
4271            &wal_path,
4272            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4273            "users",
4274            "user_updates",
4275        );
4276    }
4277
4278    #[test]
4279    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4280        let dir = tempfile::tempdir().unwrap();
4281        let db_path = dir.path().join("events_delete_atomic.rdb");
4282        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4283        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4284
4285        rt.execute_query(
4286            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4287        )
4288        .unwrap();
4289        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4290            .unwrap();
4291
4292        assert_statement_writes_collections_in_one_new_wal_batch(
4293            &rt,
4294            &wal_path,
4295            "DELETE FROM users WHERE id = 1",
4296            "users",
4297            "user_deletes",
4298        );
4299    }
4300
4301    #[test]
4302    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4303        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4304        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4305            .unwrap();
4306        for id in 0..5 {
4307            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4308                .unwrap();
4309        }
4310        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4311            .unwrap();
4312
4313        let updated = rt
4314            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4315            .unwrap();
4316        assert_eq!(updated.affected_rows, 3);
4317
4318        let selected = rt
4319            .execute_query("SELECT id, score FROM users ORDER BY id")
4320            .unwrap();
4321        let scores: Vec<(i64, i64)> = selected
4322            .result
4323            .records
4324            .iter()
4325            .map(|record| {
4326                let id = match record.get("id").unwrap() {
4327                    Value::Integer(value) => *value,
4328                    other => panic!("expected integer id, got {other:?}"),
4329                };
4330                let score = match record.get("score").unwrap() {
4331                    Value::Integer(value) => *value,
4332                    other => panic!("expected integer score, got {other:?}"),
4333                };
4334                (id, score)
4335            })
4336            .collect();
4337        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4338    }
4339
4340    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4341    /// same code path DELETE uses (#51, #52). Exercises the indexed
4342    /// equality fast-path (WHERE id = N with a HASH index), the
4343    /// unindexed range scan (WHERE score > N), and the no-WHERE
4344    /// full-scan branch to confirm the extracted "find target rows"
4345    /// loop preserves affected-row counts and the resulting row state.
4346    #[test]
4347    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4348        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4349        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4350            .unwrap();
4351        for id in 0..5 {
4352            rt.execute_query(&format!(
4353                "INSERT INTO items (id, score) VALUES ({id}, {})",
4354                id * 10
4355            ))
4356            .unwrap();
4357        }
4358        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4359            .unwrap();
4360
4361        // Indexed equality UPDATE — hits the hash fast-path inside
4362        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4363        // below the score>25 cutoff so the next assertion stays clean.
4364        let updated_one = rt
4365            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4366            .unwrap();
4367        assert_eq!(updated_one.affected_rows, 1);
4368
4369        // Unindexed scan UPDATE — bumps everyone with score > 25,
4370        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4371        // zoned/full-scan branch.
4372        let updated_many = rt
4373            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4374            .unwrap();
4375        assert_eq!(updated_many.affected_rows, 2);
4376
4377        let snapshot = rt
4378            .execute_query("SELECT id, score FROM items ORDER BY id")
4379            .unwrap();
4380        let pairs: Vec<(i64, i64)> = snapshot
4381            .result
4382            .records
4383            .iter()
4384            .map(|record| {
4385                let id = match record.get("id").unwrap() {
4386                    Value::Integer(value) => *value,
4387                    other => panic!("expected integer id, got {other:?}"),
4388                };
4389                let score = match record.get("score").unwrap() {
4390                    Value::Integer(value) => *value,
4391                    other => panic!("expected integer score, got {other:?}"),
4392                };
4393                (id, score)
4394            })
4395            .collect();
4396        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4397
4398        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4399        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4400        assert_eq!(updated_all.affected_rows, 5);
4401        let after = rt
4402            .execute_query("SELECT score FROM items ORDER BY id")
4403            .unwrap();
4404        let scores: Vec<i64> = after
4405            .result
4406            .records
4407            .iter()
4408            .map(|record| match record.get("score").unwrap() {
4409                Value::Integer(value) => *value,
4410                other => panic!("expected integer score, got {other:?}"),
4411            })
4412            .collect();
4413        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4414    }
4415
4416    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4417    /// both the index fast-path (WHERE id = N with a HASH index) and
4418    /// the unindexed scan path (WHERE score > N) to confirm the
4419    /// extracted "find target rows" loop preserves the affected-row
4420    /// count and which rows survive.
4421    #[test]
4422    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4423        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4424        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4425            .unwrap();
4426        for id in 0..5 {
4427            rt.execute_query(&format!(
4428                "INSERT INTO items (id, score) VALUES ({id}, {})",
4429                id * 10
4430            ))
4431            .unwrap();
4432        }
4433        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4434            .unwrap();
4435
4436        // Indexed equality DELETE — hits the hash fast-path inside
4437        // DmlTargetScan::find_target_ids.
4438        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4439        assert_eq!(deleted_one.affected_rows, 1);
4440
4441        // Unindexed scan DELETE — drops everyone with score > 25,
4442        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4443        // zoned/full-scan branch.
4444        let deleted_many = rt
4445            .execute_query("DELETE FROM items WHERE score > 25")
4446            .unwrap();
4447        assert_eq!(deleted_many.affected_rows, 2);
4448
4449        let surviving = rt
4450            .execute_query("SELECT id FROM items ORDER BY id")
4451            .unwrap();
4452        let ids: Vec<i64> = surviving
4453            .result
4454            .records
4455            .iter()
4456            .map(|record| match record.get("id").unwrap() {
4457                Value::Integer(value) => *value,
4458                other => panic!("expected integer id, got {other:?}"),
4459            })
4460            .collect();
4461        assert_eq!(ids, vec![0, 1]);
4462
4463        // Sanity: full-scan DELETE with no WHERE clears the rest.
4464        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4465        assert_eq!(deleted_rest.affected_rows, 2);
4466        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4467        assert!(empty.result.records.is_empty());
4468    }
4469
4470    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4471    /// INSERT but reject UPDATE and DELETE with the documented
4472    /// operator-facing error strings. Drives all three DML verbs so
4473    /// the centralized gate is exercised end-to-end.
4474    #[test]
4475    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4476        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4477        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4478            .unwrap();
4479
4480        // INSERT must succeed — APPEND ONLY exists precisely to allow
4481        // appends. The gate should be a no-op for INSERT.
4482        let inserted = rt
4483            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4484            .unwrap();
4485        assert_eq!(inserted.affected_rows, 1);
4486
4487        // UPDATE is rejected with the gate's UPDATE-specific message.
4488        let update_err = rt
4489            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4490            .unwrap_err();
4491        let msg = format!("{update_err}");
4492        assert!(
4493            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4494            "expected UPDATE rejection message, got: {msg}"
4495        );
4496
4497        // DELETE is rejected with the gate's DELETE-specific message.
4498        let delete_err = rt
4499            .execute_query("DELETE FROM events WHERE id = 1")
4500            .unwrap_err();
4501        let msg = format!("{delete_err}");
4502        assert!(
4503            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4504            "expected DELETE rejection message, got: {msg}"
4505        );
4506
4507        // Row should still be present — neither rejected mutation
4508        // touched storage.
4509        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4510        assert_eq!(surviving.result.records.len(), 1);
4511    }
4512
4513    /// CollectionContract gate: tables without an APPEND ONLY contract
4514    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4515    /// is a true pass-through, not an accidental block.
4516    #[test]
4517    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4518        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4519        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4520            .unwrap();
4521
4522        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4523            .unwrap();
4524        let updated = rt
4525            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4526            .unwrap();
4527        assert_eq!(updated.affected_rows, 1);
4528        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4529        assert_eq!(deleted.affected_rows, 1);
4530    }
4531
4532    #[test]
4533    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4534        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4535        rt.execute_query(
4536            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4537        )
4538        .unwrap();
4539
4540        let inserted = rt
4541            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4542            .unwrap();
4543        assert_eq!(inserted.affected_rows, 1);
4544
4545        let events = queue_payloads(&rt, "audit_log");
4546        assert_eq!(events.len(), 1);
4547        let event = events[0].as_object().expect("event payload object");
4548        assert!(event
4549            .get("event_id")
4550            .and_then(crate::json::Value::as_str)
4551            .is_some_and(|value| !value.is_empty()));
4552        assert_eq!(
4553            event.get("op").and_then(crate::json::Value::as_str),
4554            Some("insert")
4555        );
4556        assert_eq!(
4557            event.get("collection").and_then(crate::json::Value::as_str),
4558            Some("users")
4559        );
4560        assert_eq!(
4561            event.get("id").and_then(crate::json::Value::as_u64),
4562            Some(7)
4563        );
4564        assert!(event
4565            .get("ts")
4566            .and_then(crate::json::Value::as_u64)
4567            .is_some());
4568        assert!(event
4569            .get("lsn")
4570            .and_then(crate::json::Value::as_u64)
4571            .is_some());
4572        assert!(matches!(
4573            event.get("tenant"),
4574            Some(crate::json::Value::Null)
4575        ));
4576        assert!(matches!(
4577            event.get("before"),
4578            Some(crate::json::Value::Null)
4579        ));
4580        let after = event
4581            .get("after")
4582            .and_then(crate::json::Value::as_object)
4583            .expect("after object");
4584        assert_eq!(
4585            after.get("id").and_then(crate::json::Value::as_u64),
4586            Some(7)
4587        );
4588        assert_eq!(
4589            after.get("email").and_then(crate::json::Value::as_str),
4590            Some("a@example.com")
4591        );
4592    }
4593
4594    #[test]
4595    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4596        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4597        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4598            .unwrap();
4599
4600        rt.execute_query(
4601            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4602        )
4603        .unwrap();
4604
4605        let events = queue_payloads(&rt, "users_events");
4606        assert_eq!(events.len(), 2);
4607        let mut previous_lsn = 0;
4608        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4609            let object = event.as_object().expect("event payload object");
4610            assert_eq!(
4611                object.get("op").and_then(crate::json::Value::as_str),
4612                Some("insert")
4613            );
4614            assert_eq!(
4615                object.get("id").and_then(crate::json::Value::as_u64),
4616                Some(expected_id)
4617            );
4618            let lsn = object
4619                .get("lsn")
4620                .and_then(crate::json::Value::as_u64)
4621                .expect("event lsn");
4622            assert!(
4623                lsn > previous_lsn,
4624                "event LSNs should increase in row order"
4625            );
4626            previous_lsn = lsn;
4627            let after = object
4628                .get("after")
4629                .and_then(crate::json::Value::as_object)
4630                .expect("after object");
4631            assert_eq!(
4632                after.get("id").and_then(crate::json::Value::as_u64),
4633                Some(expected_id)
4634            );
4635        }
4636    }
4637
4638    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4639        let result = rt
4640            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4641            .expect("peek queue");
4642        result
4643            .result
4644            .records
4645            .iter()
4646            .map(
4647                |record| match record.get("payload").expect("payload column") {
4648                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4649                    other => panic!("expected JSON queue payload, got {other:?}"),
4650                },
4651            )
4652            .collect()
4653    }
4654
4655    // ── #112: auto-index user `id` on first insert ─────────────────────
4656
4657    /// First insert into a fresh collection that carries a column named
4658    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4659    /// populate it transparently, and `WHERE id = N` lookups exercise
4660    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4661    ///
4662    /// This is the load-bearing acceptance test for #112 — without the
4663    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4664    /// fall through to a full segment scan (the 4× perf gap documented
4665    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4666    #[test]
4667    fn auto_index_id_fires_on_first_insert() {
4668        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4669        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4670            .unwrap();
4671
4672        // Pre-condition: no index on `id` yet.
4673        assert!(
4674            rt.index_store_ref()
4675                .find_index_for_column("bench_users", "id")
4676                .is_none(),
4677            "freshly created collection should not have an `id` index"
4678        );
4679
4680        // Single-row INSERT — drives `MutationEngine::append_one`.
4681        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4682            .unwrap();
4683
4684        // Post-condition: hash index registered on `id`.
4685        let registered = rt
4686            .index_store_ref()
4687            .find_index_for_column("bench_users", "id")
4688            .expect("auto-index hook should have registered idx_id on first insert");
4689        assert_eq!(registered.name, "idx_id");
4690        assert_eq!(registered.collection, "bench_users");
4691        assert_eq!(registered.columns, vec!["id".to_string()]);
4692        assert!(matches!(
4693            registered.method,
4694            super::super::index_store::IndexMethodKind::Hash
4695        ));
4696
4697        // Subsequent inserts populate the index; `WHERE id = N` should
4698        // resolve via the hash fast path and round-trip every row.
4699        for id in 2..=5 {
4700            rt.execute_query(&format!(
4701                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4702                id * 10
4703            ))
4704            .unwrap();
4705        }
4706        for id in 1..=5 {
4707            let result = rt
4708                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4709                .unwrap();
4710            assert_eq!(
4711                result.result.records.len(),
4712                1,
4713                "id={id} should match one row"
4714            );
4715        }
4716
4717        // Delete via the hash fast-path — exactly the bench scenario the
4718        // perf doc identified as the 4× regression. With the index
4719        // present, `find_target_ids` short-circuits before
4720        // `for_each_entity_zoned` runs.
4721        let deleted = rt
4722            .execute_query("DELETE FROM bench_users WHERE id = 3")
4723            .unwrap();
4724        assert_eq!(deleted.affected_rows, 1);
4725    }
4726
4727    /// Bulk INSERT (the multi-row VALUES path) drives
4728    /// `MutationEngine::append_batch`. The hook must fire there too —
4729    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4730    /// wire bulk INSERT) skip auto-indexing entirely.
4731    #[test]
4732    fn auto_index_id_fires_on_first_bulk_insert() {
4733        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4734        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4735            .unwrap();
4736
4737        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4738            .unwrap();
4739
4740        let registered = rt
4741            .index_store_ref()
4742            .find_index_for_column("bench_bulk", "id")
4743            .expect("auto-index hook should fire on first bulk insert");
4744        assert_eq!(registered.name, "idx_id");
4745
4746        // Every row populated via `index_entity_insert_batch`.
4747        for id in 1..=3 {
4748            let result = rt
4749                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4750                .unwrap();
4751            assert_eq!(result.result.records.len(), 1);
4752        }
4753    }
4754
4755    /// Hook is a no-op when the row carries no `id` column. Conservative
4756    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4757    /// don't trigger it.
4758    #[test]
4759    fn auto_index_id_skips_when_no_id_column() {
4760        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4761        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4762            .unwrap();
4763        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4764            .unwrap();
4765
4766        assert!(rt
4767            .index_store_ref()
4768            .find_index_for_column("plain", "id")
4769            .is_none());
4770        assert!(rt
4771            .index_store_ref()
4772            .find_index_for_column("plain", "uid")
4773            .is_none());
4774    }
4775
4776    /// Hook only fires once per collection. If an explicit
4777    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4778    /// detects it via `find_index_for_column` and does NOT clobber it
4779    /// with a HASH index on the next insert.
4780    #[test]
4781    fn auto_index_id_skips_when_index_already_exists() {
4782        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4783        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4784            .unwrap();
4785        // User-declared BTREE index on `id` before any insert.
4786        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4787            .unwrap();
4788        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4789            .unwrap();
4790
4791        let registered = rt
4792            .index_store_ref()
4793            .find_index_for_column("pre", "id")
4794            .expect("user index should still be there");
4795        assert_eq!(
4796            registered.name, "user_idx",
4797            "auto-index hook must not overwrite an existing index"
4798        );
4799    }
4800
4801    /// Implicit `idx_id` is reaped when the collection drops. The
4802    /// existing `execute_drop_table` walks `list_indices` and drops every
4803    /// entry — confirm the auto-created index participates.
4804    #[test]
4805    fn auto_index_id_dropped_with_collection() {
4806        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4807        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4808            .unwrap();
4809        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4810            .unwrap();
4811        assert!(rt
4812            .index_store_ref()
4813            .find_index_for_column("ephemeral", "id")
4814            .is_some());
4815
4816        rt.execute_query("DROP TABLE ephemeral").unwrap();
4817
4818        assert!(
4819            rt.index_store_ref()
4820                .find_index_for_column("ephemeral", "id")
4821                .is_none(),
4822            "implicit `idx_id` must be reaped when its collection drops"
4823        );
4824    }
4825
4826    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4827    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4828    /// off, first insert leaves the collection without an `id` index —
4829    /// DELETE/UPDATE fall back to the scan path.
4830    #[test]
4831    fn auto_index_id_disabled_by_config() {
4832        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4833        let rt = RedDBRuntime::with_options(opts).unwrap();
4834
4835        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4836            .unwrap();
4837        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4838            .unwrap();
4839
4840        assert!(
4841            rt.index_store_ref()
4842                .find_index_for_column("off", "id")
4843                .is_none(),
4844            "with auto_index_id=false, no implicit index should be created"
4845        );
4846    }
4847
4848    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4849
4850    #[test]
4851    fn update_single_row_emits_update_event() {
4852        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4853        rt.execute_query(
4854            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4855        )
4856        .unwrap();
4857        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4858            .unwrap();
4859
4860        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4861            .unwrap();
4862
4863        let events = queue_payloads(&rt, "audit_log");
4864        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4865        let event = events[0].as_object().expect("event payload object");
4866        assert_eq!(
4867            event.get("op").and_then(crate::json::Value::as_str),
4868            Some("update")
4869        );
4870        assert_eq!(
4871            event.get("collection").and_then(crate::json::Value::as_str),
4872            Some("users")
4873        );
4874        assert!(event
4875            .get("event_id")
4876            .and_then(crate::json::Value::as_str)
4877            .is_some_and(|v| !v.is_empty()));
4878        let before = event
4879            .get("before")
4880            .and_then(crate::json::Value::as_object)
4881            .expect("before must be an object");
4882        let after = event
4883            .get("after")
4884            .and_then(crate::json::Value::as_object)
4885            .expect("after must be an object");
4886        assert_eq!(
4887            before.get("name").and_then(crate::json::Value::as_str),
4888            Some("Alice"),
4889            "before.name should be the old value"
4890        );
4891        assert_eq!(
4892            after.get("name").and_then(crate::json::Value::as_str),
4893            Some("Bob"),
4894            "after.name should be the new value"
4895        );
4896    }
4897
4898    #[test]
4899    fn update_event_only_includes_changed_fields() {
4900        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4901        rt.execute_query(
4902            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4903        )
4904        .unwrap();
4905        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4906            .unwrap();
4907
4908        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4909            .unwrap();
4910
4911        let events = queue_payloads(&rt, "evts");
4912        assert_eq!(events.len(), 1);
4913        let event = events[0].as_object().unwrap();
4914        let before = event
4915            .get("before")
4916            .and_then(crate::json::Value::as_object)
4917            .unwrap();
4918        let after = event
4919            .get("after")
4920            .and_then(crate::json::Value::as_object)
4921            .unwrap();
4922        // Only changed field included.
4923        assert!(
4924            before.contains_key("name"),
4925            "before must include changed field"
4926        );
4927        assert!(
4928            after.contains_key("name"),
4929            "after must include changed field"
4930        );
4931        // Unchanged fields must not appear.
4932        assert!(
4933            !before.contains_key("email"),
4934            "before must not include unchanged email"
4935        );
4936        assert!(
4937            !after.contains_key("email"),
4938            "after must not include unchanged email"
4939        );
4940    }
4941
4942    #[test]
4943    fn multi_row_update_emits_one_event_per_row() {
4944        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4945        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4946            .unwrap();
4947        rt.execute_query(
4948            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4949        )
4950        .unwrap();
4951
4952        rt.execute_query("UPDATE items SET status = 'done'")
4953            .unwrap();
4954
4955        let events = queue_payloads(&rt, "evts");
4956        assert_eq!(events.len(), 3, "expected one update event per row");
4957        for event in &events {
4958            let obj = event.as_object().unwrap();
4959            assert_eq!(
4960                obj.get("op").and_then(crate::json::Value::as_str),
4961                Some("update")
4962            );
4963        }
4964    }
4965
4966    #[test]
4967    fn delete_single_row_emits_delete_event() {
4968        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4969        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4970            .unwrap();
4971        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4972            .unwrap();
4973
4974        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4975
4976        let events = queue_payloads(&rt, "del_log");
4977        assert_eq!(events.len(), 1);
4978        let event = events[0].as_object().expect("event payload object");
4979        assert_eq!(
4980            event.get("op").and_then(crate::json::Value::as_str),
4981            Some("delete")
4982        );
4983        assert_eq!(
4984            event.get("collection").and_then(crate::json::Value::as_str),
4985            Some("users")
4986        );
4987        assert!(event
4988            .get("event_id")
4989            .and_then(crate::json::Value::as_str)
4990            .is_some_and(|v| !v.is_empty()));
4991        let before = event
4992            .get("before")
4993            .and_then(crate::json::Value::as_object)
4994            .expect("before must be an object for delete");
4995        assert_eq!(
4996            before.get("id").and_then(crate::json::Value::as_u64),
4997            Some(42)
4998        );
4999        assert_eq!(
5000            before.get("name").and_then(crate::json::Value::as_str),
5001            Some("Alice")
5002        );
5003        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5004    }
5005
5006    #[test]
5007    fn multi_row_delete_emits_one_event_per_row() {
5008        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5009        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5010            .unwrap();
5011        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5012            .unwrap();
5013
5014        rt.execute_query("DELETE FROM items").unwrap();
5015
5016        let events = queue_payloads(&rt, "del_log");
5017        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5018        for event in &events {
5019            let obj = event.as_object().unwrap();
5020            assert_eq!(
5021                obj.get("op").and_then(crate::json::Value::as_str),
5022                Some("delete")
5023            );
5024            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5025        }
5026    }
5027
5028    #[test]
5029    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5030        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5031        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5032            .unwrap();
5033
5034        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5035            .unwrap();
5036        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5037
5038        let events = queue_payloads(&rt, "evts");
5039        assert!(
5040            events.is_empty(),
5041            "UPDATE-only filter must not emit INSERT or DELETE events"
5042        );
5043    }
5044
5045    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
5046
5047    #[test]
5048    fn suppress_events_on_insert_emits_no_events() {
5049        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5050        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5051            .unwrap();
5052
5053        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5054            .unwrap();
5055
5056        let events = queue_payloads(&rt, "evts");
5057        assert!(
5058            events.is_empty(),
5059            "SUPPRESS EVENTS must prevent INSERT events"
5060        );
5061    }
5062
5063    #[test]
5064    fn suppress_events_on_update_emits_no_events() {
5065        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5066        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5067            .unwrap();
5068        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5069            .unwrap();
5070        // drain the INSERT event
5071        let _ = queue_payloads(&rt, "evts");
5072        // Force pop to drain; simpler: just check new count after UPDATE
5073        rt.execute_query("QUEUE PURGE evts").unwrap();
5074
5075        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5076            .unwrap();
5077
5078        let events = queue_payloads(&rt, "evts");
5079        assert!(
5080            events.is_empty(),
5081            "SUPPRESS EVENTS must prevent UPDATE events"
5082        );
5083    }
5084
5085    #[test]
5086    fn suppress_events_on_delete_emits_no_events() {
5087        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5088        rt.execute_query(
5089            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5090        )
5091        .unwrap();
5092        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5093            .unwrap();
5094
5095        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5096            .unwrap();
5097
5098        let events = queue_payloads(&rt, "evts");
5099        assert!(
5100            events.is_empty(),
5101            "SUPPRESS EVENTS must prevent DELETE events"
5102        );
5103    }
5104
5105    #[test]
5106    fn normal_insert_after_suppress_still_emits() {
5107        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5108        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5109            .unwrap();
5110
5111        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5112            .unwrap();
5113        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5114            .unwrap();
5115
5116        let events = queue_payloads(&rt, "evts");
5117        assert_eq!(
5118            events.len(),
5119            1,
5120            "only the non-suppressed INSERT should emit"
5121        );
5122        assert_eq!(
5123            events[0].get("id").and_then(crate::json::Value::as_u64),
5124            Some(2)
5125        );
5126    }
5127}