Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_rid, sys_key_tenant,
28    sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Issue #765 / S6 — lazily hydrate the integrity-tombstone cache from
148    /// `red_config` on first access. Returns `true` when at least one
149    /// tombstone range is present. Subsequent calls observe the cached state
150    /// flag (`1` empty / `2` present) and skip the store scan.
151    fn ensure_integrity_tombstones_loaded(&self) -> bool {
152        use std::sync::atomic::Ordering;
153        match self
154            .inner
155            .integrity_tombstones_state
156            .load(Ordering::Relaxed)
157        {
158            1 => return false,
159            2 => return true,
160            _ => {}
161        }
162        // Cold: load under the cache lock so a concurrent reader cannot
163        // observe a half-populated vector.
164        let mut guard = self.inner.integrity_tombstones.lock();
165        if self
166            .inner
167            .integrity_tombstones_state
168            .load(Ordering::Relaxed)
169            == 0
170        {
171            let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172            let present = !ranges.is_empty();
173            *guard = ranges;
174            self.inner
175                .integrity_tombstones_state
176                .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177        }
178        self.inner
179            .integrity_tombstones_state
180            .load(Ordering::Relaxed)
181            == 2
182    }
183
184    /// Issue #765 / S6 — durably record an integrity tombstone over the
185    /// inclusive RID range `[lo, hi]` of `table` (the committed rows of an
186    /// input stream whose end-to-end SHA-256 digest did not match). The range
187    /// is persisted to `red_config` (survives restart) and folded into the
188    /// in-memory cache so the same process filters it immediately.
189    pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190        use std::sync::atomic::Ordering;
191        self.ensure_integrity_tombstones_loaded();
192        let mut guard = self.inner.integrity_tombstones.lock();
193        guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194            table.to_string(),
195            lo,
196            hi,
197        ));
198        crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199        self.inner
200            .integrity_tombstones_state
201            .store(2, Ordering::Relaxed);
202    }
203
204    /// Issue #765 / S6 — snapshot of the currently-cached tombstone ranges.
205    /// Intended for tests and forensic surfaces; the read path uses
206    /// [`Self::filter_integrity_tombstoned`] which avoids the clone.
207    pub fn integrity_tombstone_ranges(
208        &self,
209    ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210        self.ensure_integrity_tombstones_loaded();
211        self.inner.integrity_tombstones.lock().clone()
212    }
213
214    /// Issue #765 / S6 — drop tombstoned rows from a SELECT result in place.
215    /// Fast no-op (one relaxed atomic load) when no tombstone has ever been
216    /// recorded. Clears `pre_serialized_json` when any row is removed so the
217    /// fast-path JSON cannot leak a filtered row back onto the wire.
218    pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219        if !self.ensure_integrity_tombstones_loaded() {
220            return;
221        }
222        let guard = self.inner.integrity_tombstones.lock();
223        if guard.is_empty() {
224            return;
225        }
226        let before = result.records.len();
227        result.records.retain(|record| {
228            !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229        });
230        if result.records.len() != before {
231            result.pre_serialized_json = None;
232        }
233    }
234
235    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
236    /// target table is tenant-scoped and the user's column list does
237    /// not already name the tenant column.
238    ///
239    /// Returns:
240    /// * `Ok(None)` — no injection needed (non-tenant table, or user
241    ///   supplied the column explicitly). Caller uses the original
242    ///   query unchanged.
243    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
244    ///   + literal value appended to every row.
245    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
246    ///   the current session. Fails loudly so callers don't produce
247    ///   rows that RLS would then hide on read.
248    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249        let Some(tenant_col) = self.tenant_column(&query.table) else {
250            return Ok(None);
251        };
252        // User already named the column (literal match) — trust them.
253        if query
254            .columns
255            .iter()
256            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257        {
258            return Ok(None);
259        }
260
261        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
262        // nested key like `headers.tenant` we operate on the root
263        // column (`headers`) and set / add the nested path inside its
264        // JSON value. If the user named the root column we mutate in
265        // place; otherwise we create a fresh JSON column for every row.
266        if let Some(dot_pos) = tenant_col.find('.') {
267            let (root, tail) = tenant_col.split_at(dot_pos);
268            let tail = &tail[1..]; // drop leading '.'
269            return self.inject_dotted_tenant(query, root, tail);
270        }
271
272        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273            return Err(RedDBError::Query(format!(
274                "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                 run SET TENANT '<id>' first or name column '{}' explicitly",
276                query.table, tenant_col
277            )));
278        };
279
280        let mut augmented = query.clone();
281        augmented.columns.push(tenant_col);
282        let lit = Value::text(tenant_id.clone());
283        for row in augmented.values.iter_mut() {
284            row.push(lit.clone());
285        }
286        for row in augmented.value_exprs.iter_mut() {
287            row.push(crate::storage::query::ast::Expr::Literal {
288                value: lit.clone(),
289                span: crate::storage::query::ast::Span::synthetic(),
290            });
291        }
292        Ok(Some(augmented))
293    }
294
295    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
296    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
297    /// nested JSON instead of appending a flat column.
298    ///
299    /// Cases:
300    /// * Root column already in the INSERT list → mutate per-row JSON
301    ///   (parse, set path, re-serialize).
302    /// * Root column absent → create a fresh `{tail: tenant}` JSON
303    ///   object and append the root column to the INSERT.
304    fn inject_dotted_tenant(
305        &self,
306        query: &InsertQuery,
307        root: &str,
308        tail: &str,
309    ) -> RedDBResult<Option<InsertQuery>> {
310        let active_tenant = crate::runtime::impl_core::current_tenant();
311        let mut augmented = query.clone();
312        let root_idx = augmented
313            .columns
314            .iter()
315            .position(|c| c.eq_ignore_ascii_case(root));
316
317        if let Some(idx) = root_idx {
318            // User supplied the root column. Per-row: if the dotted
319            // tail is already present we trust the user (admin / bulk
320            // loader scenario); otherwise fill from the active
321            // tenant. An unbound tenant is only an error when some
322            // row actually needs filling.
323            for row in augmented.values.iter_mut() {
324                let Some(slot) = row.get_mut(idx) else {
325                    continue;
326                };
327                if dotted_tail_already_set(slot, tail) {
328                    continue;
329                }
330                let Some(tenant_id) = &active_tenant else {
331                    return Err(RedDBError::Query(format!(
332                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
333                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334                        query.table, root, tail
335                    )));
336                };
337                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338            }
339            // Expression row is kept in sync by re-wrapping the
340            // mutated literal; the canonical path will re-evaluate
341            // against the same JSON shape.
342            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343                if let Some(slot) = row.get_mut(idx) {
344                    let new_value = augmented
345                        .values
346                        .get(row_idx)
347                        .and_then(|v| v.get(idx))
348                        .cloned()
349                        .unwrap_or(Value::Null);
350                    *slot = crate::storage::query::ast::Expr::Literal {
351                        value: new_value,
352                        span: crate::storage::query::ast::Span::synthetic(),
353                    };
354                }
355            }
356        } else {
357            // No root column in the INSERT list — auto-fill needs a
358            // bound tenant to synthesise one. Error loud so we never
359            // create a tenant-less row that RLS would then hide.
360            let Some(tenant_id) = &active_tenant else {
361                return Err(RedDBError::Query(format!(
362                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
363                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364                    query.table, root, tail
365                )));
366            };
367            // Create a fresh JSON column with only the tenant path set.
368            augmented.columns.push(root.to_string());
369            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370            for row in augmented.values.iter_mut() {
371                row.push(fresh.clone());
372            }
373            for row in augmented.value_exprs.iter_mut() {
374                row.push(crate::storage::query::ast::Expr::Literal {
375                    value: fresh.clone(),
376                    span: crate::storage::query::ast::Span::synthetic(),
377                });
378            }
379        }
380
381        Ok(Some(augmented))
382    }
383
384    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
385    /// `lsns` is empty because events fire at commit time.
386    fn delete_entities_batch(
387        &self,
388        collection: &str,
389        ids: &[EntityId],
390    ) -> RedDBResult<(u64, Vec<u64>)> {
391        if ids.is_empty() {
392            return Ok((0, vec![]));
393        }
394
395        let store = self.db().store();
396        let Some(manager) = store.get_collection(collection) else {
397            return Ok((0, vec![]));
398        };
399
400        let active_xid = self.current_xid();
401        let conn_id = crate::runtime::impl_core::current_connection_id();
402        let mut autocommit_xid = None;
403        let mut tombstoned_ids = Vec::new();
404        let mut tombstoned_entities = Vec::new();
405        let mut physical_delete_ids = Vec::new();
406        let table_row_resolver =
407            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408
409        for &id in ids {
410            let Some(mut entity) = manager.get(id) else {
411                continue;
412            };
413            if matches!(entity.data, EntityData::Row(_)) {
414                let previous_xmax = entity.xmax;
415                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
416                    if table_row_resolver.resolve_candidate(&entity).is_none() {
417                        continue;
418                    }
419                } else if entity.xmax != 0 {
420                    continue;
421                }
422
423                let xid = match active_xid {
424                    Some(xid) => xid,
425                    None => match autocommit_xid {
426                        Some(xid) => xid,
427                        None => {
428                            let mgr = self.snapshot_manager();
429                            let xid = mgr.begin();
430                            autocommit_xid = Some(xid);
431                            xid
432                        }
433                    },
434                };
435                entity.set_xmax(xid);
436                if manager.update(entity.clone()).is_ok() {
437                    if active_xid.is_some() {
438                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
439                    }
440                    tombstoned_entities.push(entity);
441                    tombstoned_ids.push(id);
442                }
443            } else {
444                physical_delete_ids.push(id);
445            }
446        }
447
448        if let Some(xid) = autocommit_xid {
449            self.snapshot_manager().commit(xid);
450        }
451
452        let mut affected = tombstoned_ids.len() as u64;
453        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
454        if active_xid.is_some() {
455            store
456                .persist_entities_to_pager(collection, &tombstoned_entities)
457                .map_err(|err| RedDBError::Internal(err.to_string()))?;
458        } else {
459            store
460                .persist_entities_to_pager(collection, &tombstoned_entities)
461                .map_err(|err| RedDBError::Internal(err.to_string()))?;
462            for id in &tombstoned_ids {
463                store.context_index().remove_entity(*id);
464                let lsn = self.cdc_emit(
465                    crate::replication::cdc::ChangeOperation::Delete,
466                    collection,
467                    id.raw(),
468                    "entity",
469                );
470                lsns.push(lsn);
471            }
472        }
473
474        let deleted_ids = store
475            .delete_batch(collection, &physical_delete_ids)
476            .map_err(|err| RedDBError::Internal(err.to_string()))?;
477        affected += deleted_ids.len() as u64;
478        for id in &deleted_ids {
479            store.context_index().remove_entity(*id);
480            let lsn = self.cdc_emit(
481                crate::replication::cdc::ChangeOperation::Delete,
482                collection,
483                id.raw(),
484                "entity",
485            );
486            lsns.push(lsn);
487        }
488
489        Ok((affected, lsns))
490    }
491
492    /// Flushes context-index updates and CDC for each applied mutation.
493    /// Returns one LSN per entity in the same order as `applied`.
494    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
495        if applied.is_empty() {
496            return Ok(Vec::new());
497        }
498
499        let store = self.db().store();
500        if applied.iter().any(|item| item.context_index_dirty) {
501            store.context_index().index_entities(
502                &applied[0].collection,
503                applied
504                    .iter()
505                    .filter(|item| item.context_index_dirty)
506                    .map(|item| &item.entity),
507            );
508        }
509
510        for item in applied {
511            self.refresh_update_secondary_indexes(item)?;
512        }
513
514        let mut lsns = Vec::with_capacity(applied.len());
515        for item in applied {
516            let lsn = self.cdc_emit_prebuilt(
517                crate::replication::cdc::ChangeOperation::Update,
518                &item.collection,
519                &item.entity,
520                update_cdc_item_kind(self, &item.collection, &item.entity),
521                item.metadata.as_ref(),
522                false,
523            );
524            lsns.push(lsn);
525        }
526        Ok(lsns)
527    }
528
529    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
530        self.persist_applied_entity_mutations(applied)
531    }
532
533    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
534        if applied.pre_mutation_fields.is_empty() {
535            return Ok(());
536        }
537        let post = entity_row_fields_snapshot(&applied.entity);
538        if post.is_empty() {
539            return Ok(());
540        }
541
542        let indexed_cols = self
543            .index_store_ref()
544            .indexed_columns_set(&applied.collection);
545        if indexed_cols.is_empty() {
546            return Ok(());
547        }
548
549        if let Some(old_version) = applied.replaced_entity.as_ref() {
550            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
551                .pre_mutation_fields
552                .iter()
553                .filter(|(col, _)| indexed_cols.contains(col))
554                .cloned()
555                .collect();
556            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
557                .iter()
558                .filter(|(col, _)| indexed_cols.contains(col))
559                .cloned()
560                .collect();
561            if !old_index_fields.is_empty() {
562                self.index_store_ref()
563                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
564                    .map_err(crate::RedDBError::Internal)?;
565            }
566            if !new_index_fields.is_empty() {
567                self.index_store_ref()
568                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
569                    .map_err(crate::RedDBError::Internal)?;
570            }
571            return Ok(());
572        }
573
574        let damage =
575            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
576        if damage
577            .touched_columns()
578            .into_iter()
579            .any(|col| indexed_cols.contains(col))
580        {
581            self.index_store_ref()
582                .index_entity_update(
583                    &applied.collection,
584                    applied.id,
585                    &applied.pre_mutation_fields,
586                    &post,
587                )
588                .map_err(crate::RedDBError::Internal)?;
589        }
590        Ok(())
591    }
592
593    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
594    ///
595    /// Each row in `query.values` is zipped with `query.columns` to produce a
596    /// set of named fields, which is then dispatched based on entity_type.
597    pub fn execute_insert(
598        &self,
599        raw_query: &str,
600        query: &InsertQuery,
601    ) -> RedDBResult<RuntimeQueryResult> {
602        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
603        // CollectionContract gate (#49): single entry point for the
604        // operator's collection-level write rules. Today this is a
605        // no-op for INSERT (APPEND ONLY permits insert); routing
606        // through the gate now means future contract bits — versioned,
607        // vault-only writes — plug in once instead of per verb.
608        crate::runtime::collection_contract::CollectionContractGate::check(
609            self,
610            &query.table,
611            crate::runtime::collection_contract::MutationKind::Insert,
612        )?;
613        // Phase 2.5.4 table-scoped tenancy: if the target table is
614        // tenant-scoped and the user didn't name the tenant column,
615        // auto-inject it with the thread-local `CURRENT_TENANT()`
616        // value. When the column is named explicitly we trust the
617        // caller (useful for admin tooling that writes on behalf of
618        // specific tenants). An unbound tenant on an implicit-fill
619        // path errors up front rather than producing a row the RLS
620        // policy would silently hide.
621        let augmented_owned;
622        let query = match self.maybe_inject_tenant_column(query)? {
623            Some(new_q) => {
624                augmented_owned = new_q;
625                &augmented_owned
626            }
627            None => query,
628        };
629        self.check_insert_column_policy(query)?;
630        if let Some(ref embed_config) = query.auto_embed {
631            let provider = crate::ai::parse_provider(&embed_config.provider)?;
632            // S3 / #711: planner-level provider gate. Runs before the
633            // local-model preflight and the API-key resolver so neither
634            // side-effect fires when policy denies.
635            crate::runtime::ai::provider_gate::enforce(self, &provider)?;
636            if matches!(provider, crate::ai::AiProvider::Local) {
637                crate::runtime::ai::local_embedding::ensure_local_embedding_available()?;
638                // Issue #682 — pre-flight the local model registry before
639                // any row write. Missing model, uninstalled artifacts,
640                // wrong task, and disabled-feature failures surface as
641                // deterministic errors that leave the target collection
642                // untouched, satisfying the "no partial writes on
643                // embedding failure" criterion for the failure modes
644                // owned by the local provider.
645                let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
646                if model_name.is_empty() {
647                    return Err(RedDBError::Query(
648                        "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
649                         the local provider does not have an implicit default model"
650                            .to_string(),
651                    ));
652                }
653                crate::runtime::ai::local_embedding::preflight_local_embedding(
654                    &self.inner.db,
655                    model_name,
656                )?;
657            }
658        }
659
660        let mut inserted_count: u64 = 0;
661        let effective_rows =
662            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
663
664        // Ensure the collection exists (auto-create on first insert).
665        let store = self.inner.db.store();
666        let _ = store.get_or_create_collection(&query.table);
667        let declared_model = self
668            .db()
669            .collection_contract_arc(&query.table)
670            .map(|contract| contract.declared_model);
671
672        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
673            if query.returning.is_some() {
674                Some(Vec::with_capacity(effective_rows.len()))
675            } else {
676                None
677            };
678        let mut returning_result: Option<UnifiedResult> = None;
679
680        if matches!(query.entity_type, InsertEntityType::Row)
681            && !matches!(
682                declared_model,
683                Some(crate::catalog::CollectionModel::TimeSeries)
684            )
685        {
686            // Issue #523 + #524: blockchain collections seal each row into the
687            // chain. When the caller omits the reserved columns, the engine
688            // auto-fills (#523). When the caller supplies any reserved column,
689            // the values are validated against the current tip and a mismatch
690            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
691            //
692            // The whole batch runs under a per-collection chain lock so two
693            // concurrent submitters can't both bind to the same prev_hash —
694            // the loser observes the advanced tip and gets 409 with the new
695            // tip so it can retry.
696            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
697            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
698                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
699            } else {
700                None
701            };
702            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
703
704            // Issue #525 — refuse new blocks if the chain has been marked
705            // `integrity = broken` until an admin clears the flag.
706            if chain_mode && self.is_chain_integrity_broken(&query.table) {
707                return Err(RedDBError::InvalidOperation(format!(
708                    "ChainIntegrityBroken: collection '{}' is locked until \
709                     POST /collections/{}/clear-integrity-flag is called by an admin",
710                    query.table, query.table
711                )));
712            }
713
714            // Pull the tip from the in-memory cache; fall back to a one-time
715            // scan if the cache hasn't seen this collection yet (cold start
716            // after restart). Cache is updated below as rows are sealed.
717            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
718                if chain_mode {
719                    let mut cache = self.inner.chain_tip_cache.lock();
720                    if let Some(existing) = cache.get(&query.table) {
721                        Some(existing.clone())
722                    } else if let Some(scanned) =
723                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
724                    {
725                        cache.insert(query.table.clone(), scanned.clone());
726                        Some(scanned)
727                    } else {
728                        None
729                    }
730                } else {
731                    None
732                };
733
734            let mut rows = Vec::with_capacity(effective_rows.len());
735            for row_values in &effective_rows {
736                if row_values.len() != query.columns.len() {
737                    return Err(RedDBError::Query(format!(
738                        "INSERT column count ({}) does not match value count ({})",
739                        query.columns.len(),
740                        row_values.len()
741                    )));
742                }
743                let (mut fields, mut metadata) =
744                    split_insert_metadata(self, &query.columns, row_values)?;
745                if chain_mode {
746                    use crate::runtime::blockchain_kind::{
747                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
748                        COL_TIMESTAMP, RESERVED_COLUMNS,
749                    };
750                    let supplied_height = fields
751                        .iter()
752                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
753                        .map(|(_, v)| v.clone());
754                    let supplied_prev = fields
755                        .iter()
756                        .find(|(k, _)| k == COL_PREV_HASH)
757                        .map(|(_, v)| v.clone());
758                    let supplied_ts = fields
759                        .iter()
760                        .find(|(k, _)| k == COL_TIMESTAMP)
761                        .map(|(_, v)| v.clone());
762                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
763                    let user_supplied_any = supplied_height.is_some()
764                        || supplied_prev.is_some()
765                        || supplied_ts.is_some()
766                        || supplied_hash;
767
768                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
769                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
770
771                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
772                        Some(t) => (t.hash, t.height + 1),
773                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
774                    };
775                    let server_now = crate::runtime::blockchain_kind::now_ms();
776
777                    let (use_prev, use_height, use_ts) = if user_supplied_any {
778                        // Caller is participating in the chain protocol —
779                        // every field must be supplied AND match the tip.
780                        if supplied_hash {
781                            return Err(chain_conflict_error(
782                                tip_next_height.saturating_sub(1),
783                                tip_prev_hash,
784                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
785                                server_now,
786                                "hash column is engine-computed and cannot be supplied",
787                            ));
788                        }
789                        let caller_prev = match &supplied_prev {
790                            Some(Value::Blob(b)) if b.len() == 32 => {
791                                let mut a = [0u8; 32];
792                                a.copy_from_slice(b);
793                                a
794                            }
795                            Some(Value::Text(s)) if s.len() == 64 => {
796                                // Accept hex-encoded prev_hash so JSON / SQL
797                                // callers without literal-blob syntax can
798                                // still participate in the chain protocol.
799                                let mut a = [0u8; 32];
800                                let mut ok = true;
801                                for (i, slot) in a.iter_mut().enumerate() {
802                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
803                                    match u8::from_str_radix(pair, 16) {
804                                        Ok(byte) => *slot = byte,
805                                        Err(_) => {
806                                            ok = false;
807                                            break;
808                                        }
809                                    }
810                                }
811                                if !ok {
812                                    return Err(chain_conflict_error(
813                                        tip_next_height.saturating_sub(1),
814                                        tip_prev_hash,
815                                        chain_tip_full
816                                            .as_ref()
817                                            .map(|t| t.timestamp_ms)
818                                            .unwrap_or(0),
819                                        server_now,
820                                        "prev_hash is not valid hex",
821                                    ));
822                                }
823                                a
824                            }
825                            _ => {
826                                return Err(chain_conflict_error(
827                                    tip_next_height.saturating_sub(1),
828                                    tip_prev_hash,
829                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
830                                    server_now,
831                                    "prev_hash missing or not a 32-byte Blob",
832                                ));
833                            }
834                        };
835                        if caller_prev != tip_prev_hash {
836                            return Err(chain_conflict_error(
837                                tip_next_height.saturating_sub(1),
838                                tip_prev_hash,
839                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
840                                server_now,
841                                "prev_hash does not match current tip",
842                            ));
843                        }
844                        let caller_height = match &supplied_height {
845                            Some(Value::UnsignedInteger(v)) => *v,
846                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
847                            _ => {
848                                return Err(chain_conflict_error(
849                                    tip_next_height.saturating_sub(1),
850                                    tip_prev_hash,
851                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
852                                    server_now,
853                                    "block_height missing or not an unsigned integer",
854                                ));
855                            }
856                        };
857                        if caller_height != tip_next_height {
858                            return Err(chain_conflict_error(
859                                tip_next_height.saturating_sub(1),
860                                tip_prev_hash,
861                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
862                                server_now,
863                                "block_height does not match tip+1",
864                            ));
865                        }
866                        let caller_ts = match &supplied_ts {
867                            Some(Value::UnsignedInteger(v)) => *v,
868                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
869                            _ => {
870                                return Err(chain_conflict_error(
871                                    tip_next_height.saturating_sub(1),
872                                    tip_prev_hash,
873                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
874                                    server_now,
875                                    "timestamp missing or not an unsigned integer",
876                                ));
877                            }
878                        };
879                        let drift = (caller_ts as i128) - (server_now as i128);
880                        if drift.abs() > 60_000 {
881                            return Err(chain_conflict_error(
882                                tip_next_height.saturating_sub(1),
883                                tip_prev_hash,
884                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
885                                server_now,
886                                "timestamp outside ±60s of server_time",
887                            ));
888                        }
889                        (caller_prev, caller_height, caller_ts)
890                    } else {
891                        (tip_prev_hash, tip_next_height, server_now)
892                    };
893
894                    let (reserved, new_hash) =
895                        crate::runtime::blockchain_kind::make_block_reserved_fields(
896                            use_prev, use_height, use_ts, &payload,
897                        );
898                    fields.extend(reserved);
899                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
900                        height: use_height,
901                        hash: new_hash,
902                        timestamp_ms: use_ts,
903                    });
904                }
905                // Issue #522 — signed-writes verification. On collections
906                // created with `SIGNED_BY (...)` the row must carry valid
907                // `signer_pubkey` + `signature` reserved columns. Runs
908                // after chain_mode so canonical payload covers user-supplied
909                // fields only (blockchain reserved columns are filtered by
910                // `canonical_payload`; the two signed-writes reserved
911                // columns are split out before payload computation, then
912                // re-attached for storage). The blockchain + SIGNED_BY
913                // composition is owned by issue #526; we keep #522 to the
914                // non-chain path and let chain_mode collections punt to that
915                // slice rather than half-wire it here.
916                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
917                    let (pk_col, sig_col, residual) =
918                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
919                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
920                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
921                    crate::runtime::signed_writes_kind::verify_row(
922                        &reg,
923                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
924                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
925                        &payload,
926                    )
927                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
928                    fields = residual;
929                    // Round-trip the reserved columns with the value
930                    // type the caller supplied (Text/hex on the SQL path,
931                    // Blob on the binary path). Keeps SELECT and WHERE
932                    // predicates symmetric with the INSERT shape.
933                    if let Some(col) = pk_col {
934                        fields.push((
935                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
936                            col.raw_value,
937                        ));
938                    }
939                    if let Some(col) = sig_col {
940                        fields.push((
941                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
942                            col.raw_value,
943                        ));
944                    }
945                }
946                merge_with_clauses(
947                    &mut metadata,
948                    query.ttl_ms,
949                    query.expires_at_ms,
950                    &query.with_metadata,
951                );
952                if let Some(snaps) = returning_snapshots.as_mut() {
953                    snaps.push(fields.clone());
954                }
955                rows.push(CreateRowInput {
956                    collection: query.table.clone(),
957                    fields,
958                    metadata,
959                    node_links: Vec::new(),
960                    vector_links: Vec::new(),
961                });
962            }
963            let outputs = self.create_rows_batch(CreateRowsBatchInput {
964                collection: query.table.clone(),
965                rows,
966                suppress_events: query.suppress_events,
967            })?;
968            inserted_count = outputs.len() as u64;
969
970            // Chain mode: commit the new tip to the in-memory cache only after
971            // the batch persisted successfully. If the batch threw mid-way the
972            // cache stays on the previous tip and the chain lock releases.
973            if chain_mode {
974                if let Some(new_tip) = chain_tip_full.as_ref() {
975                    self.inner
976                        .chain_tip_cache
977                        .lock()
978                        .insert(query.table.clone(), new_tip.clone());
979                }
980            }
981
982            // Hypertable chunk routing: if this table was declared via
983            // CREATE HYPERTABLE, register each row's time-column value
984            // with the registry so chunk metadata (bounds, row counts,
985            // TTL eligibility) stays current. This is what lets
986            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
987            // retention daemon sweep expired chunks without scanning
988            // every row.
989            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
990                let time_col = &spec.time_column;
991                // Find the column's index in the INSERT column list.
992                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
993                    for row in &effective_rows {
994                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
995                            if *n >= 0 {
996                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
997                            }
998                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
999                            let _ = self.inner.db.hypertables().route(&query.table, *n);
1000                        }
1001                    }
1002                }
1003            }
1004
1005            if let (Some(items), Some(snaps)) =
1006                (query.returning.as_ref(), returning_snapshots.take())
1007            {
1008                let snaps = row_insert_returning_snapshots(&outputs, snaps);
1009                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1010            }
1011        } else {
1012            // Issue #419: surface the inserted entity id on every INSERT path.
1013            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
1014            // so a RETURNING clause (and the unconditional inserted_ids list,
1015            // below) can expose the engine-assigned id. TimeSeries (the row
1016            // branch in this else) still returns the not-supported error
1017            // because create_timeseries_point isn't plumbed through this fn.
1018            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1019                Vec::with_capacity(effective_rows.len());
1020            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1021            {
1022                Vec::with_capacity(effective_rows.len())
1023            } else {
1024                Vec::new()
1025            };
1026            if matches!(
1027                query.entity_type,
1028                InsertEntityType::Node | InsertEntityType::Edge
1029            ) {
1030                enum PreparedGraphInsert {
1031                    Node {
1032                        fields: Vec<(String, Value)>,
1033                        input: CreateNodeInput,
1034                    },
1035                    Edge {
1036                        fields: Vec<(String, Value)>,
1037                        input: CreateEdgeInput,
1038                    },
1039                }
1040
1041                let mut prepared = Vec::with_capacity(effective_rows.len());
1042                for row_values in &effective_rows {
1043                    if row_values.len() != query.columns.len() {
1044                        return Err(RedDBError::Query(format!(
1045                            "INSERT column count ({}) does not match value count ({})",
1046                            query.columns.len(),
1047                            row_values.len()
1048                        )));
1049                    }
1050
1051                    match query.entity_type {
1052                        InsertEntityType::Node => {
1053                            let (node_values, mut metadata) =
1054                                split_insert_metadata(self, &query.columns, row_values)?;
1055                            merge_with_clauses(
1056                                &mut metadata,
1057                                query.ttl_ms,
1058                                query.expires_at_ms,
1059                                &query.with_metadata,
1060                            );
1061                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1062                            apply_collection_default_ttl_metadata(
1063                                self,
1064                                &query.table,
1065                                &mut metadata,
1066                            );
1067                            let (columns, values) = pairwise_columns_values(&node_values);
1068                            let label = find_column_value_string(&columns, &values, "label")?;
1069                            let node_type =
1070                                find_column_value_opt_string(&columns, &values, "node_type");
1071                            let properties = extract_remaining_properties(
1072                                &columns,
1073                                &values,
1074                                &["label", "node_type"],
1075                            );
1076                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1077                                properties.iter().map(|(key, _)| key.as_str()),
1078                                &format!("node '{}'", query.table),
1079                            )?;
1080                            prepared.push(PreparedGraphInsert::Node {
1081                                fields: node_values,
1082                                input: CreateNodeInput {
1083                                    collection: query.table.clone(),
1084                                    label,
1085                                    node_type,
1086                                    properties,
1087                                    metadata,
1088                                    embeddings: Vec::new(),
1089                                    table_links: Vec::new(),
1090                                    node_links: Vec::new(),
1091                                },
1092                            });
1093                        }
1094                        InsertEntityType::Edge => {
1095                            let (edge_values, mut metadata) =
1096                                split_insert_metadata(self, &query.columns, row_values)?;
1097                            merge_with_clauses(
1098                                &mut metadata,
1099                                query.ttl_ms,
1100                                query.expires_at_ms,
1101                                &query.with_metadata,
1102                            );
1103                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1104                            apply_collection_default_ttl_metadata(
1105                                self,
1106                                &query.table,
1107                                &mut metadata,
1108                            );
1109                            let (columns, values) = pairwise_columns_values(&edge_values);
1110                            let label = find_column_value_string(&columns, &values, "label")?;
1111                            ensure_non_tree_structural_edge_label(&label)?;
1112                            let from_id = resolve_edge_endpoint_any(
1113                                self.inner.db.store().as_ref(),
1114                                &query.table,
1115                                &columns,
1116                                &values,
1117                                &["from_rid", "from"],
1118                            )?;
1119                            let to_id = resolve_edge_endpoint_any(
1120                                self.inner.db.store().as_ref(),
1121                                &query.table,
1122                                &columns,
1123                                &values,
1124                                &["to_rid", "to"],
1125                            )?;
1126                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1127                            let properties = extract_remaining_properties(
1128                                &columns,
1129                                &values,
1130                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1131                            );
1132                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1133                                properties.iter().map(|(key, _)| key.as_str()),
1134                                &format!("edge '{}'", query.table),
1135                            )?;
1136                            prepared.push(PreparedGraphInsert::Edge {
1137                                fields: edge_values,
1138                                input: CreateEdgeInput {
1139                                    collection: query.table.clone(),
1140                                    label,
1141                                    from: EntityId::new(from_id),
1142                                    to: EntityId::new(to_id),
1143                                    weight,
1144                                    properties,
1145                                    metadata,
1146                                },
1147                            });
1148                        }
1149                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1150                    }
1151                }
1152
1153                ensure_graph_insert_contract(self, &query.table)?;
1154                let mut batch = self.inner.db.batch();
1155                for item in prepared {
1156                    match item {
1157                        PreparedGraphInsert::Node { fields, input } => {
1158                            if query.returning.is_some() {
1159                                returning_field_snaps.push(fields);
1160                            }
1161                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1162                            batch = batch.add_node_with_type(
1163                                input.collection,
1164                                input.label,
1165                                node_type,
1166                                input.properties.into_iter().collect(),
1167                                input.metadata.into_iter().collect(),
1168                            );
1169                        }
1170                        PreparedGraphInsert::Edge { fields, input } => {
1171                            if query.returning.is_some() {
1172                                returning_field_snaps.push(fields);
1173                            }
1174                            batch = batch.add_edge(
1175                                input.collection,
1176                                input.label,
1177                                input.from,
1178                                input.to,
1179                                input.weight.unwrap_or(1.0),
1180                                input.properties.into_iter().collect(),
1181                                input.metadata.into_iter().collect(),
1182                            );
1183                        }
1184                    }
1185                }
1186                let batch_result = batch
1187                    .execute()
1188                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1189                let (ids, entity_kind) = match query.entity_type {
1190                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1191                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1192                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1193                };
1194                for id in &ids {
1195                    self.stamp_xmin_if_in_txn(&query.table, *id);
1196                }
1197                if query.returning.is_some() {
1198                    returning_field_snaps = graph_insert_returning_snapshots(
1199                        self.inner.db.store().as_ref(),
1200                        &query.table,
1201                        &ids,
1202                    );
1203                }
1204                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1205                let store = self.inner.db.store();
1206                entity_outputs.extend(ids.iter().map(|id| {
1207                    crate::application::entity::CreateEntityOutput {
1208                        id: *id,
1209                        entity: store.get(&query.table, *id),
1210                    }
1211                }));
1212                inserted_count = ids.len() as u64;
1213            } else {
1214                for row_values in &effective_rows {
1215                    if row_values.len() != query.columns.len() {
1216                        return Err(RedDBError::Query(format!(
1217                            "INSERT column count ({}) does not match value count ({})",
1218                            query.columns.len(),
1219                            row_values.len()
1220                        )));
1221                    }
1222
1223                    match query.entity_type {
1224                        InsertEntityType::Row => {
1225                            if query.returning.is_some() {
1226                                return Err(RedDBError::Query(
1227                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1228                                    .to_string(),
1229                            ));
1230                            }
1231                            let (fields, mut metadata) =
1232                                split_insert_metadata(self, &query.columns, row_values)?;
1233                            merge_with_clauses(
1234                                &mut metadata,
1235                                query.ttl_ms,
1236                                query.expires_at_ms,
1237                                &query.with_metadata,
1238                            );
1239                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1240                        }
1241                        InsertEntityType::Node | InsertEntityType::Edge => {
1242                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1243                        }
1244                        InsertEntityType::Vector => {
1245                            let (vector_values, mut metadata) =
1246                                split_insert_metadata(self, &query.columns, row_values)?;
1247                            merge_with_clauses(
1248                                &mut metadata,
1249                                query.ttl_ms,
1250                                query.expires_at_ms,
1251                                &query.with_metadata,
1252                            );
1253                            let (columns, values) = pairwise_columns_values(&vector_values);
1254                            let dense = find_column_value_vec_f32_any(
1255                                &columns,
1256                                &values,
1257                                &["dense", "embedding"],
1258                            )?;
1259                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1260                            let content =
1261                                find_column_value_opt_string(&columns, &values, "content");
1262                            if query.returning.is_some() {
1263                                returning_field_snaps.push(vector_values.clone());
1264                            }
1265                            let input = CreateVectorInput {
1266                                collection: query.table.clone(),
1267                                dense,
1268                                content,
1269                                metadata,
1270                                link_row: None,
1271                                link_node: None,
1272                            };
1273                            entity_outputs.push(self.create_vector(input)?);
1274                        }
1275                        InsertEntityType::Document => {
1276                            let (document_values, mut metadata) =
1277                                split_insert_metadata(self, &query.columns, row_values)?;
1278                            merge_with_clauses(
1279                                &mut metadata,
1280                                query.ttl_ms,
1281                                query.expires_at_ms,
1282                                &query.with_metadata,
1283                            );
1284                            let (columns, values) = pairwise_columns_values(&document_values);
1285                            let body = find_document_body_json(&columns, &values)?;
1286                            let input = CreateDocumentInput {
1287                                collection: query.table.clone(),
1288                                body,
1289                                metadata,
1290                                node_links: Vec::new(),
1291                                vector_links: Vec::new(),
1292                            };
1293                            let output = self.create_document(input)?;
1294                            if query.returning.is_some() {
1295                                let fields = output
1296                                    .entity
1297                                    .as_ref()
1298                                    .map(entity_row_fields_snapshot)
1299                                    .filter(|fields| !fields.is_empty())
1300                                    .unwrap_or(document_values);
1301                                returning_field_snaps.push(fields);
1302                            }
1303                            entity_outputs.push(output);
1304                        }
1305                        InsertEntityType::Kv => {
1306                            let (kv_values, mut metadata) =
1307                                split_insert_metadata(self, &query.columns, row_values)?;
1308                            merge_with_clauses(
1309                                &mut metadata,
1310                                query.ttl_ms,
1311                                query.expires_at_ms,
1312                                &query.with_metadata,
1313                            );
1314                            let (columns, values) = pairwise_columns_values(&kv_values);
1315                            let key = find_column_value_string(&columns, &values, "key")?;
1316                            let value = find_column_value(&columns, &values, "value")?;
1317                            if query.returning.is_some() {
1318                                returning_field_snaps.push(kv_values.clone());
1319                            }
1320                            let input = CreateKvInput {
1321                                collection: query.table.clone(),
1322                                key,
1323                                value,
1324                                metadata,
1325                            };
1326                            entity_outputs.push(self.create_kv(input)?);
1327                        }
1328                    }
1329
1330                    inserted_count += 1;
1331                }
1332            }
1333
1334            if let Some(items) = query.returning.as_ref() {
1335                if !entity_outputs.is_empty() {
1336                    returning_result = Some(build_returning_result(
1337                        items,
1338                        &returning_field_snaps,
1339                        Some(&entity_outputs),
1340                    ));
1341                }
1342            }
1343        }
1344
1345        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1346        if let Some(ref embed_config) = query.auto_embed {
1347            let store = self.inner.db.store();
1348            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1349            let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1350            // Local provider runs in-process — no API key path applies.
1351            // The pre-flight above already required `MODEL '<name>'`
1352            // for the local case, so the unwrap_or default below only
1353            // ever fires for OpenAI-compatible providers.
1354            let api_key = if is_local_provider {
1355                String::new()
1356            } else {
1357                crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1358            };
1359            let model = embed_config.model.clone().unwrap_or_else(|| {
1360                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1361                    .ok()
1362                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1363            });
1364
1365            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1366            let manager = store
1367                .get_collection(&query.table)
1368                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1369            let entities = manager.query_all(|_| true);
1370            let recent: Vec<_> = entities
1371                .into_iter()
1372                .rev()
1373                .take(effective_rows.len())
1374                .collect();
1375
1376            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1377            let entity_combos: Vec<(usize, String)> = recent
1378                .iter()
1379                .enumerate()
1380                .filter_map(|(i, entity)| {
1381                    if let EntityData::Row(ref row) = entity.data {
1382                        if let Some(ref named) = row.named {
1383                            let texts: Vec<String> = embed_config
1384                                .fields
1385                                .iter()
1386                                .filter_map(|field| match named.get(field) {
1387                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1388                                    _ => None,
1389                                })
1390                                .collect();
1391                            if !texts.is_empty() {
1392                                return Some((i, texts.join(" ")));
1393                            }
1394                        }
1395                    }
1396                    None
1397                })
1398                .collect();
1399
1400            if !entity_combos.is_empty() {
1401                // Batch phase: single provider round-trip for all rows.
1402                let batch_texts: Vec<String> =
1403                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1404
1405                // Issue #682 — when the provider is `local`, bypass
1406                // AiBatchClient (which is HTTP-only) and dispatch
1407                // directly through the in-process local embedding
1408                // backend. All texts go in one call, mirroring the
1409                // single-round-trip shape of the remote path. The
1410                // local backend does not perform intra-batch dedup —
1411                // each input position gets its own row in the output
1412                // — which keeps the per-row "create_vector" loop
1413                // below correct without additional fan-out logic.
1414                let embeddings = if is_local_provider {
1415                    let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1416                        &self.inner.db,
1417                        &model,
1418                        batch_texts,
1419                    )?;
1420                    response.embeddings
1421                } else {
1422                    let batch_client =
1423                        crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1424
1425                    match tokio::runtime::Handle::try_current() {
1426                        Ok(handle) => tokio::task::block_in_place(|| {
1427                            handle.block_on(batch_client.embed_batch(
1428                                &provider,
1429                                &model,
1430                                &api_key,
1431                                batch_texts,
1432                            ))
1433                        }),
1434                        Err(_) => {
1435                            return Err(RedDBError::Query(
1436                                "AUTO EMBED requires a Tokio runtime context".to_string(),
1437                            ));
1438                        }
1439                    }
1440                    .map_err(|e| RedDBError::Query(e.to_string()))?
1441                };
1442
1443                // Distribute phase: persist one vector per non-empty embedding.
1444                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1445                    if dense.is_empty() {
1446                        continue;
1447                    }
1448                    self.create_vector(CreateVectorInput {
1449                        collection: query.table.clone(),
1450                        dense,
1451                        content: Some(combined.clone()),
1452                        metadata: Vec::new(),
1453                        link_row: None,
1454                        link_node: None,
1455                    })?;
1456                }
1457            }
1458        }
1459
1460        if inserted_count > 0 {
1461            self.note_table_write(&query.table);
1462        }
1463
1464        let mut result = RuntimeQueryResult::dml_result(
1465            raw_query.to_string(),
1466            inserted_count,
1467            "insert",
1468            "runtime-dml",
1469        );
1470        if let Some(returning) = returning_result {
1471            result.result = returning;
1472        }
1473        Ok(result)
1474    }
1475
1476    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1477        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1478            return Ok(());
1479        };
1480        if !auth_store.iam_authorization_enabled() {
1481            return Ok(());
1482        }
1483        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1484            return Ok(());
1485        };
1486
1487        let tenant = crate::runtime::impl_core::current_tenant();
1488        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1489        let request = crate::auth::ColumnAccessRequest {
1490            action: "insert".to_string(),
1491            schema: None,
1492            table: query.table.clone(),
1493            columns: query.columns.clone(),
1494        };
1495        let ctx = crate::auth::policies::EvalContext {
1496            principal_tenant: tenant.clone(),
1497            current_tenant: tenant,
1498            peer_ip: None,
1499            mfa_present: false,
1500            now_ms: crate::auth::now_ms(),
1501            principal_is_admin_role: role == crate::auth::Role::Admin,
1502            principal_is_platform_scoped: principal.tenant.is_none(),
1503        };
1504
1505        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1506        let table_allowed = matches!(
1507            outcome.table_decision,
1508            crate::auth::policies::Decision::Allow { .. }
1509                | crate::auth::policies::Decision::AdminBypass
1510        );
1511        if !table_allowed {
1512            return Err(RedDBError::Query(format!(
1513                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1514                outcome.table_resource.kind, outcome.table_resource.name
1515            )));
1516        }
1517        if let Some(denied) = outcome.first_denied_column() {
1518            return Err(RedDBError::Query(format!(
1519                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1520                denied.resource.kind, denied.resource.name
1521            )));
1522        }
1523
1524        Ok(())
1525    }
1526
1527    pub(crate) fn insert_timeseries_point(
1528        &self,
1529        collection: &str,
1530        fields: Vec<(String, Value)>,
1531        mut metadata: Vec<(String, MetadataValue)>,
1532    ) -> RedDBResult<EntityId> {
1533        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1534
1535        let (columns, values) = pairwise_columns_values(&fields);
1536        validate_timeseries_insert_columns(&columns)?;
1537
1538        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1539        // an `event_name` whose schema is registered, validate the
1540        // `payload` JSON against it BEFORE any write side-effect. On
1541        // failure we return a typed error and the row is not
1542        // persisted. When no schema is registered for the event name
1543        // (or no `event_name` column is supplied at all) we fall
1544        // through to the normal write path for back-compat with
1545        // existing timeseries rows.
1546        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1547        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1548        if let Some(event_name) = event_name_opt.as_deref() {
1549            let store_for_schema = self.inner.db.store();
1550            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1551                .is_some()
1552            {
1553                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1554                super::analytics_schema_registry::validate(
1555                    store_for_schema.as_ref(),
1556                    event_name,
1557                    payload_json,
1558                )
1559                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1560            }
1561        }
1562
1563        // `metric` is required by the existing timeseries write path;
1564        // when an analytics-style row supplies `event_name` but not
1565        // `metric`, fall back to the event name so the storage path
1566        // still has a non-empty metric tag.
1567        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1568            Some(m) => m,
1569            None => event_name_opt.clone().ok_or_else(|| {
1570                RedDBError::Query(
1571                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1572                )
1573            })?,
1574        };
1575        // `value` is optional for analytics-event rows (which are
1576        // semantically counts of 1); default to 1.0 when missing so
1577        // analytics inserts don't have to fabricate a metric value.
1578        let value = match find_column_value_opt_string(&columns, &values, "value") {
1579            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1580            None => columns
1581                .iter()
1582                .position(|c| c.eq_ignore_ascii_case("value"))
1583                .and_then(|i| match &values[i] {
1584                    Value::Float(f) => Some(*f),
1585                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1586                    Value::UnsignedInteger(n) => Some(*n as f64),
1587                    _ => None,
1588                })
1589                .unwrap_or(1.0),
1590        };
1591        let timestamp_ns =
1592            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1593        let mut tags = find_timeseries_tags(&columns, &values)?;
1594        if let Some(ref name) = event_name_opt {
1595            tags.entry("event_name".to_string())
1596                .or_insert_with(|| name.clone());
1597        }
1598        if let Some(ref payload) = payload_opt {
1599            tags.entry("payload".to_string())
1600                .or_insert_with(|| payload.clone());
1601        }
1602
1603        let mut entity = UnifiedEntity::new(
1604            EntityId::new(0),
1605            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1606                series: collection.to_string(),
1607                metric: metric.clone(),
1608            })),
1609            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1610                metric,
1611                timestamp_ns,
1612                value,
1613                tags,
1614            }),
1615        );
1616        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1617        // or an autocommit xid (allocated and committed up-front so
1618        // future snapshots see the row as soon as it lands).
1619        let writer_xid = match self.current_xid() {
1620            Some(xid) => xid,
1621            None => {
1622                let mgr = self.snapshot_manager();
1623                let xid = mgr.begin();
1624                mgr.commit(xid);
1625                xid
1626            }
1627        };
1628        entity.set_xmin(writer_xid);
1629
1630        let store = self.inner.db.store();
1631        let id = store
1632            .insert_auto(collection, entity)
1633            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1634
1635        if !metadata.is_empty() {
1636            let _ = store.set_metadata(
1637                collection,
1638                id,
1639                Metadata::with_fields(metadata.into_iter().collect()),
1640            );
1641        }
1642
1643        self.cdc_emit(
1644            crate::replication::cdc::ChangeOperation::Insert,
1645            collection,
1646            id.raw(),
1647            "timeseries",
1648        );
1649
1650        Ok(id)
1651    }
1652
1653    /// Execute UPDATE table SET col=val, ... WHERE filter
1654    ///
1655    /// Scans the target collection, evaluates the WHERE filter against each
1656    /// record, and patches every matching entity.
1657    pub fn execute_update(
1658        &self,
1659        raw_query: &str,
1660        query: &UpdateQuery,
1661    ) -> RedDBResult<RuntimeQueryResult> {
1662        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1663        // Issue #523 — blockchain collections are immutable. Reject before
1664        // RLS / RETURNING work so the operator sees a clean 409-mapped
1665        // error instead of a partially-applied mutation surface.
1666        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1667            return Err(RedDBError::InvalidOperation(format!(
1668                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1669                query.table
1670            )));
1671        }
1672        // CollectionContract gate (#50): runs the APPEND ONLY guard
1673        // (and any future contract bits) before RLS / RETURNING work
1674        // so the operator's immutability declaration is honoured
1675        // uniformly and the error message points at the DDL rather
1676        // than at a downstream symptom.
1677        crate::runtime::collection_contract::CollectionContractGate::check(
1678            self,
1679            &query.table,
1680            crate::runtime::collection_contract::MutationKind::Update,
1681        )?;
1682        ensure_update_target_contract(self, &query.table, query.target)?;
1683        ensure_graph_identity_update_target_allowed(query)?;
1684
1685        // Apply RLS augmentation first so every downstream path — plain
1686        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1687        // same policy-filtered target set. This prevents RETURNING
1688        // from ever exposing rows the UPDATE policy would have
1689        // denied.
1690        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1691        let augmented_query: UpdateQuery;
1692        let effective_query: &UpdateQuery = if rls_gated {
1693            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1694                self,
1695                &query.table,
1696                crate::storage::query::ast::PolicyAction::Update,
1697            );
1698            let Some(policy) = rls_filter else {
1699                // No admitting policy: zero rows affected, empty
1700                // RETURNING (never leak rows the caller can't touch).
1701                let mut response = RuntimeQueryResult::dml_result(
1702                    raw_query.to_string(),
1703                    0,
1704                    "update",
1705                    "runtime-dml-rls",
1706                );
1707                if let Some(items) = query.returning.clone() {
1708                    response.result = build_returning_result(&items, &[], None);
1709                }
1710                return Ok(response);
1711            };
1712            let mut augmented = query.clone();
1713            augmented.filter = Some(match augmented.filter.take() {
1714                Some(existing) => {
1715                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1716                }
1717                None => policy,
1718            });
1719            augmented_query = augmented;
1720            &augmented_query
1721        } else {
1722            query
1723        };
1724
1725        // RETURNING wraps the inner executor and uses the touched-id
1726        // list the inner reports so the post-image reflects exactly
1727        // the rows the UPDATE actually mutated (not whatever a
1728        // separate SELECT might have observed).
1729        if let Some(items) = effective_query.returning.clone() {
1730            let mut inner_query = effective_query.clone();
1731            inner_query.returning = None;
1732            let (mut response, touched_ids) =
1733                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1734
1735            let snapshots = if matches!(
1736                effective_query.target,
1737                UpdateTarget::Nodes | UpdateTarget::Edges
1738            ) {
1739                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1740            } else {
1741                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1742                    .row_snapshots(&touched_ids)
1743            };
1744
1745            response.result = build_returning_result(&items, &snapshots, None);
1746            response.engine = "runtime-dml-returning";
1747            return Ok(response);
1748        }
1749
1750        self.execute_update_inner(raw_query, effective_query)
1751    }
1752
1753    /// Back-compat shim: the older entry point ignored touched ids.
1754    fn execute_update_inner(
1755        &self,
1756        raw_query: &str,
1757        query: &UpdateQuery,
1758    ) -> RedDBResult<RuntimeQueryResult> {
1759        self.execute_update_inner_tracked(raw_query, query)
1760            .map(|(res, _)| res)
1761    }
1762
1763    fn execute_update_inner_tracked(
1764        &self,
1765        raw_query: &str,
1766        query: &UpdateQuery,
1767    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1768        let store = self.inner.db.store();
1769        let effective_filter = effective_update_filter(query);
1770        let compiled_plan = self.compile_update_plan(query)?;
1771        let needs_rmw_lock = update_needs_rmw_lock(query);
1772        let table_rmw_lock = if needs_rmw_lock {
1773            Some(
1774                self.inner
1775                    .rmw_locks
1776                    .lock_for(&query.table, "__table_rmw_update__"),
1777            )
1778        } else {
1779            None
1780        };
1781        let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1782        let mut touched_ids: Vec<EntityId> = Vec::new();
1783        let limit_cap = query.limit.map(|l| l as usize);
1784        let manager = store
1785            .get_collection(&query.table)
1786            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1787        let scan_limit = if query.order_by.is_empty() {
1788            limit_cap
1789        } else {
1790            None
1791        };
1792        let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1793            self,
1794            &query.table,
1795            effective_filter.as_ref(),
1796            scan_limit,
1797            query.target,
1798        );
1799        if needs_rmw_lock {
1800            target_scan = target_scan.with_live_table_rows();
1801        }
1802        let ids_to_update = target_scan.find_target_ids()?;
1803        let ids_to_update = if query.order_by.is_empty() {
1804            ids_to_update
1805        } else {
1806            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1807        };
1808
1809        if needs_rmw_lock {
1810            return self.execute_update_inner_tracked_locked(
1811                raw_query,
1812                query,
1813                &compiled_plan,
1814                &ids_to_update,
1815                effective_filter.as_ref(),
1816            );
1817        }
1818
1819        let mut affected: u64 = 0;
1820        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1821            let mut applied_chunk = Vec::with_capacity(chunk.len());
1822            for entity in manager.get_many(chunk).into_iter().flatten() {
1823                let assignments =
1824                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1825                let applied = self.apply_materialized_update_for_entity(
1826                    query.table.clone(),
1827                    entity,
1828                    &compiled_plan,
1829                    assignments,
1830                )?;
1831                touched_ids.push(applied.id);
1832                applied_chunk.push(applied);
1833            }
1834            self.persist_update_chunk(&applied_chunk)?;
1835            affected += applied_chunk.len() as u64;
1836            let lsns = self.flush_update_chunk(&applied_chunk)?;
1837            if !query.suppress_events {
1838                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1839            }
1840        }
1841
1842        if affected > 0 {
1843            self.note_table_write(&query.table);
1844        }
1845
1846        Ok((
1847            RuntimeQueryResult::dml_result(
1848                raw_query.to_string(),
1849                affected,
1850                "update",
1851                "runtime-dml",
1852            ),
1853            touched_ids,
1854        ))
1855    }
1856
1857    fn execute_update_inner_tracked_locked(
1858        &self,
1859        raw_query: &str,
1860        query: &UpdateQuery,
1861        compiled_plan: &CompiledUpdatePlan,
1862        ids_to_update: &[EntityId],
1863        effective_filter: Option<&Filter>,
1864    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1865        let store = self.inner.db.store();
1866        let mut touched_ids = Vec::new();
1867        let mut lock_entries = Vec::new();
1868
1869        for id in ids_to_update {
1870            let Some(candidate) = store.get(&query.table, *id) else {
1871                continue;
1872            };
1873            let logical_id = candidate.logical_id();
1874            let lock_key = format!("row:{}", logical_id.raw());
1875            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1876            lock_entries.push((lock_key, logical_id, rmw_lock));
1877        }
1878
1879        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1880        lock_entries.dedup_by(|left, right| left.0 == right.0);
1881        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1882
1883        let mut applied_chunk = Vec::new();
1884        for (_, logical_id, _) in &lock_entries {
1885            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1886            else {
1887                continue;
1888            };
1889            if let Some(filter) = effective_filter {
1890                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1891                    Some(self.inner.db.as_ref()),
1892                    &entity,
1893                    filter,
1894                    &query.table,
1895                    &query.table,
1896                ) {
1897                    continue;
1898                }
1899            }
1900
1901            let assignments =
1902                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1903            let applied = self.apply_materialized_update_for_entity(
1904                query.table.clone(),
1905                entity,
1906                compiled_plan,
1907                assignments,
1908            )?;
1909            touched_ids.push(applied.id);
1910            applied_chunk.push(applied);
1911        }
1912
1913        let affected = applied_chunk.len() as u64;
1914        if !applied_chunk.is_empty() {
1915            self.persist_update_chunk(&applied_chunk)?;
1916            let lsns = self.flush_update_chunk(&applied_chunk)?;
1917            if !query.suppress_events {
1918                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1919            }
1920        }
1921
1922        if affected > 0 {
1923            self.note_table_write(&query.table);
1924        }
1925
1926        Ok((
1927            RuntimeQueryResult::dml_result(
1928                raw_query.to_string(),
1929                affected,
1930                "update",
1931                "runtime-dml",
1932            ),
1933            touched_ids,
1934        ))
1935    }
1936
1937    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1938        let mut static_field_assignments = Vec::new();
1939        let mut static_metadata_assignments = Vec::new();
1940        let mut dynamic_assignments = Vec::new();
1941        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1942        let mut row_modified_columns = Vec::new();
1943
1944        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1945            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1946            let metadata_key = resolve_sql_ttl_metadata_key(column);
1947            if compound_op.is_some() && metadata_key.is_some() {
1948                return Err(RedDBError::Query(format!(
1949                    "compound assignment is only supported for row fields: {column}"
1950                )));
1951            }
1952            if compound_op.is_none() {
1953                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1954                    if let Some(metadata_key) = metadata_key {
1955                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1956                        let (canonical_key, canonical_value) =
1957                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1958                        static_metadata_assignments
1959                            .push((canonical_key.to_string(), canonical_value));
1960                    } else {
1961                        let value = self.resolve_crypto_sentinel(value)?;
1962                        static_field_assignments.push((
1963                            column.clone(),
1964                            normalize_row_update_assignment_with_plan(
1965                                &query.table,
1966                                column,
1967                                value,
1968                                row_contract_plan.as_ref(),
1969                            )?,
1970                        ));
1971                        row_modified_columns.push(column.clone());
1972                    }
1973                    continue;
1974                }
1975            }
1976
1977            dynamic_assignments.push(CompiledUpdateAssignment {
1978                column: column.clone(),
1979                expr: expr.clone(),
1980                compound_op,
1981                metadata_key,
1982                row_rule: if metadata_key.is_none() {
1983                    if let Some(plan) = row_contract_plan.as_ref() {
1984                        if plan.timestamps_enabled
1985                            && (column == "created_at" || column == "updated_at")
1986                        {
1987                            return Err(RedDBError::Query(format!(
1988                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1989                                query.table, column
1990                            )));
1991                        }
1992                        if let Some(rule) = plan.declared_rules.get(column) {
1993                            Some(rule.clone())
1994                        } else if plan.strict_schema {
1995                            return Err(RedDBError::Query(format!(
1996                                "collection '{}' is strict and does not allow undeclared fields: {}",
1997                                query.table, column
1998                            )));
1999                        } else {
2000                            None
2001                        }
2002                    } else {
2003                        None
2004                    }
2005                } else {
2006                    None
2007                },
2008            });
2009            if metadata_key.is_none() {
2010                row_modified_columns.push(column.clone());
2011            }
2012        }
2013
2014        let row_modified_columns = dedupe_update_columns(row_modified_columns);
2015        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2016            row_modified_columns.iter().any(|column| {
2017                plan.unique_columns
2018                    .keys()
2019                    .any(|unique| unique.eq_ignore_ascii_case(column))
2020            })
2021        });
2022
2023        if let Some(ttl_ms) = query.ttl_ms {
2024            static_metadata_assignments
2025                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2026        }
2027        if let Some(expires_at_ms) = query.expires_at_ms {
2028            static_metadata_assignments.push((
2029                "_expires_at".to_string(),
2030                metadata_u64_to_value(expires_at_ms),
2031            ));
2032        }
2033        for (key, val) in &query.with_metadata {
2034            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2035        }
2036
2037        Ok(CompiledUpdatePlan {
2038            static_field_assignments,
2039            static_metadata_assignments,
2040            dynamic_assignments,
2041            row_contract_plan,
2042            row_modified_columns,
2043            row_touches_unique_columns,
2044        })
2045    }
2046
2047    fn materialize_update_assignments_for_entity(
2048        &self,
2049        query: &UpdateQuery,
2050        entity: &UnifiedEntity,
2051        compiled_plan: &CompiledUpdatePlan,
2052    ) -> RedDBResult<MaterializedUpdateAssignments> {
2053        let mut assignments = MaterializedUpdateAssignments::default();
2054        let mut record: Option<UnifiedRecord> = None;
2055
2056        for assignment in &compiled_plan.dynamic_assignments {
2057            if assignment.compound_op.is_some()
2058                && !matches!(
2059                    entity.data,
2060                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2061                )
2062            {
2063                return Err(RedDBError::Query(format!(
2064                    "compound assignment is only supported for row or graph UPDATE column '{}'",
2065                    assignment.column
2066                )));
2067            }
2068            if record.is_none() {
2069                record = runtime_any_record_from_entity_ref(entity);
2070            }
2071            let Some(record) = record.as_ref() else {
2072                return Err(RedDBError::Query(format!(
2073                    "UPDATE could not materialize runtime record for entity {} in '{}'",
2074                    entity.id.raw(),
2075                    query.table
2076                )));
2077            };
2078            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2079                Some(self.inner.db.as_ref()),
2080                &assignment.expr,
2081                record,
2082                Some(query.table.as_str()),
2083                Some(query.table.as_str()),
2084            )
2085            .ok_or_else(|| {
2086                RedDBError::Query(format!(
2087                    "failed to evaluate UPDATE expression for column '{}'",
2088                    assignment.column
2089                ))
2090            })?;
2091            let value = if let Some(op) = assignment.compound_op {
2092                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2093            } else {
2094                rhs
2095            };
2096
2097            if let Some(metadata_key) = assignment.metadata_key {
2098                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2099                let (canonical_key, canonical_value) =
2100                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2101                assignments
2102                    .dynamic_metadata_assignments
2103                    .push((canonical_key.to_string(), canonical_value));
2104            } else {
2105                assignments.dynamic_field_assignments.push((
2106                    assignment.column.clone(),
2107                    normalize_row_update_value_for_rule(
2108                        &query.table,
2109                        self.resolve_crypto_sentinel(value)?,
2110                        assignment.row_rule.as_ref(),
2111                    )?,
2112                ));
2113            }
2114        }
2115
2116        Ok(assignments)
2117    }
2118
2119    fn apply_materialized_update_for_entity(
2120        &self,
2121        collection: String,
2122        entity: UnifiedEntity,
2123        compiled_plan: &CompiledUpdatePlan,
2124        assignments: MaterializedUpdateAssignments,
2125    ) -> RedDBResult<AppliedEntityMutation> {
2126        if matches!(entity.data, EntityData::Row(_)) {
2127            return self.apply_loaded_sql_update_row_core(
2128                collection,
2129                entity,
2130                &compiled_plan.static_field_assignments,
2131                assignments.dynamic_field_assignments,
2132                &compiled_plan.static_metadata_assignments,
2133                assignments.dynamic_metadata_assignments,
2134                compiled_plan.row_contract_plan.as_ref(),
2135                &compiled_plan.row_modified_columns,
2136                compiled_plan.row_touches_unique_columns,
2137            );
2138        }
2139
2140        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2141
2142        let operations = build_patch_operations_from_materialized_assignments(
2143            &entity,
2144            compiled_plan,
2145            assignments,
2146        );
2147        self.apply_loaded_patch_entity_core(
2148            collection,
2149            entity,
2150            crate::json::Value::Null,
2151            operations,
2152        )
2153    }
2154
2155    /// Execute DELETE FROM table WHERE filter
2156    pub fn execute_delete(
2157        &self,
2158        raw_query: &str,
2159        query: &DeleteQuery,
2160    ) -> RedDBResult<RuntimeQueryResult> {
2161        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2162        // Issue #523 — blockchain collections are immutable; see
2163        // execute_update for the same gate.
2164        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2165            return Err(RedDBError::InvalidOperation(format!(
2166                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2167                query.table
2168            )));
2169        }
2170        // CollectionContract gate (#50) — see execute_update for
2171        // rationale. The gate handles APPEND ONLY rejection and is
2172        // the single point where future contract bits land.
2173        crate::runtime::collection_contract::CollectionContractGate::check(
2174            self,
2175            &query.table,
2176            crate::runtime::collection_contract::MutationKind::Delete,
2177        )?;
2178
2179        // RETURNING on DELETE: capture the pre-image via an internal
2180        // SELECT that reuses the same WHERE, then run the delete with
2181        // the RETURNING clause stripped, then project the captured
2182        // rows through the requested items. The extra SELECT is a
2183        // pragmatic MVP — a future pass can fuse the scan with the
2184        // delete to avoid the second pass over the heap.
2185        if let Some(items) = query.returning.clone() {
2186            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2187                RedDBError::Query(
2188                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2189                )
2190            })?;
2191            let captured = self.execute_query(&select_sql)?;
2192
2193            let mut inner_query = query.clone();
2194            inner_query.returning = None;
2195            let _ = self.execute_delete(raw_query, &inner_query)?;
2196
2197            let snapshots: Vec<Vec<(String, Value)>> = captured
2198                .result
2199                .records
2200                .iter()
2201                .map(|rec| {
2202                    rec.iter_fields()
2203                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2204                        .collect()
2205                })
2206                .collect();
2207            let affected = snapshots.len() as u64;
2208            let result = build_returning_result(&items, &snapshots, None);
2209
2210            let mut response = RuntimeQueryResult::dml_result(
2211                raw_query.to_string(),
2212                affected,
2213                "delete",
2214                "runtime-dml-returning",
2215            );
2216            response.result = result;
2217            return Ok(response);
2218        }
2219        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2220        //
2221        // When the table has RLS enabled, gate the DELETE by the
2222        // per-role policy set: mutations only touch rows that *every*
2223        // matching `FOR DELETE` policy would accept. No policies =>
2224        // zero rows affected (PG restrictive-default).
2225        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2226            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2227                self,
2228                &query.table,
2229                crate::storage::query::ast::PolicyAction::Delete,
2230            );
2231            let Some(policy) = rls_filter else {
2232                return Ok(RuntimeQueryResult::dml_result(
2233                    raw_query.to_string(),
2234                    0,
2235                    "delete",
2236                    "runtime-dml-rls",
2237                ));
2238            };
2239            // Fold the policy predicate into the user's WHERE before
2240            // dispatching — the remainder of this function reads the
2241            // filter from `query` via `effective_delete_filter`, which
2242            // respects the updated value.
2243            let mut augmented = query.clone();
2244            augmented.filter = Some(match augmented.filter.take() {
2245                Some(existing) => {
2246                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2247                }
2248                None => policy,
2249            });
2250            return self.execute_delete_inner(raw_query, &augmented);
2251        }
2252        self.execute_delete_inner(raw_query, query)
2253    }
2254
2255    fn execute_delete_inner(
2256        &self,
2257        raw_query: &str,
2258        query: &DeleteQuery,
2259    ) -> RedDBResult<RuntimeQueryResult> {
2260        let effective_filter = effective_delete_filter(query);
2261
2262        // Find the rows that match the WHERE clause. The "find target
2263        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2264        // the same scan strategy.
2265        let scan = super::dml_target_scan::DmlTargetScan::new(
2266            self,
2267            &query.table,
2268            effective_filter.as_ref(),
2269            None,
2270        );
2271        let ids_to_delete = scan.find_target_ids()?;
2272
2273        // For event-enabled collections, snapshot the pre-delete state
2274        // before rows are physically removed.
2275        let needs_delete_events =
2276            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2277        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2278            scan.row_json_pre_images(&ids_to_delete)
2279        } else {
2280            HashMap::new()
2281        };
2282
2283        let mut affected: u64 = 0;
2284        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2285            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2286            affected += count;
2287            if needs_delete_events && !lsns.is_empty() {
2288                // lsns.len() == actually-deleted entities; align with chunk ids.
2289                // `delete_batch` may skip missing entities, so we correlate by
2290                // the number returned (they're emitted in chunk order).
2291                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2292                self.emit_delete_events_for_collection(
2293                    &query.table,
2294                    deleted_chunk,
2295                    &lsns,
2296                    &pre_images,
2297                )?;
2298            }
2299        }
2300        pre_images.clear();
2301
2302        if affected > 0 {
2303            self.note_table_write(&query.table);
2304        }
2305
2306        Ok(RuntimeQueryResult::dml_result(
2307            raw_query.to_string(),
2308            affected,
2309            "delete",
2310            "runtime-dml",
2311        ))
2312    }
2313}
2314
2315/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2316/// columns regardless of whether any row matches the WHERE clause. The
2317/// per-entity guard below covers only the matched-rows case, but ADR 0019
2318/// declares these columns immutable on the surface itself, so a zero-row
2319/// UPDATE should still surface the same error to operators and SDKs.
2320fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2321    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2322        return Ok(());
2323    }
2324    for (column, _) in &query.assignment_exprs {
2325        if is_immutable_graph_identity_field(column) {
2326            return Err(RedDBError::Query(format!(
2327                "immutable graph field '{column}' cannot be updated"
2328            )));
2329        }
2330    }
2331    Ok(())
2332}
2333
2334fn ensure_graph_identity_update_allowed(
2335    entity: &UnifiedEntity,
2336    compiled_plan: &CompiledUpdatePlan,
2337    assignments: &MaterializedUpdateAssignments,
2338) -> RedDBResult<()> {
2339    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2340        return Ok(());
2341    }
2342
2343    for (column, _) in compiled_plan
2344        .static_field_assignments
2345        .iter()
2346        .chain(assignments.dynamic_field_assignments.iter())
2347    {
2348        if is_immutable_graph_identity_field(column) {
2349            return Err(RedDBError::Query(format!(
2350                "immutable graph field '{column}' cannot be updated"
2351            )));
2352        }
2353    }
2354
2355    Ok(())
2356}
2357
2358fn is_immutable_graph_identity_field(column: &str) -> bool {
2359    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2360        .iter()
2361        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2362}
2363
2364fn build_patch_operations_from_materialized_assignments(
2365    entity: &UnifiedEntity,
2366    compiled_plan: &CompiledUpdatePlan,
2367    assignments: MaterializedUpdateAssignments,
2368) -> Vec<PatchEntityOperation> {
2369    let mut operations = Vec::with_capacity(
2370        compiled_plan.static_field_assignments.len()
2371            + compiled_plan.static_metadata_assignments.len()
2372            + assignments.dynamic_field_assignments.len()
2373            + assignments.dynamic_metadata_assignments.len(),
2374    );
2375
2376    for (column, value) in &compiled_plan.static_field_assignments {
2377        operations.push(PatchEntityOperation {
2378            op: PatchEntityOperationType::Set,
2379            path: update_patch_path_for_entity(entity, column),
2380            value: Some(storage_value_to_json(value)),
2381        });
2382    }
2383
2384    for (column, value) in assignments.dynamic_field_assignments {
2385        operations.push(PatchEntityOperation {
2386            op: PatchEntityOperationType::Set,
2387            path: update_patch_path_for_entity(entity, &column),
2388            value: Some(storage_value_to_json(&value)),
2389        });
2390    }
2391
2392    for (key, value) in &compiled_plan.static_metadata_assignments {
2393        operations.push(PatchEntityOperation {
2394            op: PatchEntityOperationType::Set,
2395            path: vec!["metadata".to_string(), key.clone()],
2396            value: Some(metadata_value_to_json(value)),
2397        });
2398    }
2399
2400    for (key, value) in assignments.dynamic_metadata_assignments {
2401        operations.push(PatchEntityOperation {
2402            op: PatchEntityOperationType::Set,
2403            path: vec!["metadata".to_string(), key],
2404            value: Some(metadata_value_to_json(&value)),
2405        });
2406    }
2407
2408    operations
2409}
2410
2411fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2412    if matches!(
2413        (&entity.kind, &entity.data),
2414        (
2415            crate::storage::EntityKind::GraphNode(_),
2416            EntityData::Node(_)
2417        )
2418    ) && column.eq_ignore_ascii_case("node_type")
2419    {
2420        return vec!["node_type".to_string()];
2421    }
2422    if matches!(
2423        (&entity.kind, &entity.data),
2424        (
2425            crate::storage::EntityKind::GraphEdge(_),
2426            EntityData::Edge(_)
2427        )
2428    ) && column.eq_ignore_ascii_case("weight")
2429    {
2430        return vec!["weight".to_string()];
2431    }
2432    vec!["fields".to_string(), column.to_string()]
2433}
2434
2435/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2436/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2437/// capture the pre-image before actually removing the rows. Returns
2438/// `None` when the input does not start with `DELETE`.
2439///
2440/// Case-insensitive on the keywords. Preserves everything between
2441/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2442/// LIMIT survive untouched. The RETURNING tail — if present — is
2443/// truncated at the first top-level `RETURNING` token.
2444fn delete_to_select_sql(sql: &str) -> Option<String> {
2445    let trimmed = sql.trim_start();
2446    let lowered = trimmed.to_ascii_lowercase();
2447    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2448        return None;
2449    }
2450    // Find `FROM` after DELETE.
2451    let from_idx = lowered.find(" from ")?;
2452    let after_from = &trimmed[from_idx + " from ".len()..];
2453    let after_from_lc = &lowered[from_idx + " from ".len()..];
2454
2455    // Cut off the RETURNING tail (a naive search — the RETURNING
2456    // clause only appears once per statement at top level in our
2457    // grammar). Matches whitespace-bounded tokens to avoid clipping
2458    // `RETURNING` inside a string literal.
2459    let mut body = after_from.to_string();
2460    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2461        body.truncate(pos);
2462    }
2463    Some(format!("SELECT * FROM {}", body.trim_end()))
2464}
2465
2466/// Find the byte offset of a whitespace-bounded keyword in a
2467/// lowercased haystack, skipping matches inside single-quoted
2468/// string literals. Naive — no escape handling — but enough for
2469/// the shapes the DML parser emits.
2470fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2471    let bytes = haystack.as_bytes();
2472    let nlen = needle.len();
2473    let mut i = 0usize;
2474    let mut in_string = false;
2475    while i < bytes.len() {
2476        let c = bytes[i];
2477        if c == b'\'' {
2478            in_string = !in_string;
2479            i += 1;
2480            continue;
2481        }
2482        if !in_string
2483            && i + nlen <= bytes.len()
2484            && &bytes[i..i + nlen] == needle.as_bytes()
2485            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2486            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2487        {
2488            return Some(i);
2489        }
2490        i += 1;
2491    }
2492    None
2493}
2494
2495/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2496/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2497/// for one affected row; `outputs`, when provided, supplies the engine-
2498/// assigned entity id for the same row (INSERT path). Projection honours
2499/// the RETURNING items: `*` expands to every snapshot column plus
2500/// the public row envelope when available.
2501fn build_returning_result(
2502    items: &[ReturningItem],
2503    snapshots: &[Vec<(String, Value)>],
2504    outputs: Option<&[CreateEntityOutput]>,
2505) -> UnifiedResult {
2506    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2507    let public_item_outputs = outputs.is_some_and(|outs| {
2508        outs.first()
2509            .and_then(|out| out.entity.as_ref())
2510            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2511    });
2512
2513    let mut columns: Vec<String> = if project_all {
2514        let mut cols: Vec<String> = Vec::new();
2515        if public_item_outputs {
2516            cols.extend(
2517                [
2518                    "rid",
2519                    "collection",
2520                    "kind",
2521                    "tenant",
2522                    "created_at",
2523                    "updated_at",
2524                ]
2525                .into_iter()
2526                .map(str::to_string),
2527            );
2528        } else if outputs.is_some() {
2529            cols.push("rid".to_string());
2530        }
2531        if let Some(first) = snapshots.first() {
2532            for (name, _) in first {
2533                cols.push(name.clone());
2534            }
2535        }
2536        cols
2537    } else {
2538        items
2539            .iter()
2540            .filter_map(|it| match it {
2541                ReturningItem::Column(c) => Some(c.clone()),
2542                ReturningItem::All => None,
2543            })
2544            .collect()
2545    };
2546    // Guarantee unique order-preserving column list.
2547    {
2548        let mut seen = std::collections::HashSet::new();
2549        columns.retain(|c| seen.insert(c.clone()));
2550    }
2551
2552    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2553    for (idx, snap) in snapshots.iter().enumerate() {
2554        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2555        if let Some(outs) = outputs {
2556            if let Some(out) = outs.get(idx) {
2557                if let Some(entity) = out.entity.as_ref() {
2558                    if let Some(kind) = public_returning_item_kind(entity) {
2559                        values.insert(
2560                            Arc::clone(&sys_key_rid()),
2561                            Value::UnsignedInteger(out.id.raw()),
2562                        );
2563                        values.insert(
2564                            Arc::clone(&sys_key_collection()),
2565                            Value::text(entity.kind.collection().to_string()),
2566                        );
2567                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2568                        values.insert(
2569                            Arc::clone(&sys_key_created_at()),
2570                            Value::UnsignedInteger(entity.created_at),
2571                        );
2572                        values.insert(
2573                            Arc::clone(&sys_key_updated_at()),
2574                            Value::UnsignedInteger(entity.updated_at),
2575                        );
2576                    } else {
2577                        values.insert(
2578                            Arc::clone(&sys_key_rid()),
2579                            Value::Integer(out.id.raw() as i64),
2580                        );
2581                    }
2582                } else {
2583                    values.insert(
2584                        Arc::clone(&sys_key_rid()),
2585                        Value::Integer(out.id.raw() as i64),
2586                    );
2587                }
2588            }
2589        }
2590        for (name, val) in snap {
2591            values.insert(Arc::from(name.as_str()), val.clone());
2592        }
2593        if !values.contains_key("tenant") {
2594            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2595            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2596        }
2597        let mut rec = UnifiedRecord::default();
2598        // Only keep projected columns on the record.
2599        for col in &columns {
2600            if let Some(v) = values.get(col.as_str()) {
2601                rec.set_arc(Arc::from(col.as_str()), v.clone());
2602            }
2603        }
2604        records.push(rec);
2605    }
2606
2607    UnifiedResult {
2608        columns,
2609        records,
2610        stats: Default::default(),
2611        pre_serialized_json: None,
2612    }
2613}
2614
2615fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2616    match (&entity.kind, &entity.data) {
2617        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2618            Some("node")
2619        }
2620        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2621            Some("edge")
2622        }
2623        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2624        // #1369 — every entity model must expose its `rid` in RETURNING *.
2625        // Vectors carry their payload in `EntityData::Vector`, not `Row`, so
2626        // they were falling through to a no-rid envelope
2627        // and `RETURNING *` never surfaced the entity-id.
2628        (_, crate::storage::EntityData::Vector(_)) => Some("vector"),
2629        _ => None,
2630    }
2631}
2632
2633fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2634    let Some(row) = entity.data.as_row() else {
2635        return "row";
2636    };
2637
2638    let is_kv = row.named.as_ref().is_some_and(|named| {
2639        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2640            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2641    });
2642    if is_kv {
2643        return "kv";
2644    }
2645
2646    let is_document = row
2647        .named
2648        .as_ref()
2649        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2650        || row.columns.iter().any(runtime_returning_documentish_value);
2651    if is_document {
2652        "document"
2653    } else {
2654        "row"
2655    }
2656}
2657
2658fn runtime_returning_documentish_value(value: &Value) -> bool {
2659    matches!(value, Value::Json(_) | Value::Blob(_))
2660}
2661
2662fn row_insert_returning_snapshots(
2663    outputs: &[CreateEntityOutput],
2664    fallback: Vec<Vec<(String, Value)>>,
2665) -> Vec<Vec<(String, Value)>> {
2666    outputs
2667        .iter()
2668        .enumerate()
2669        .map(|(idx, out)| {
2670            out.entity
2671                .as_ref()
2672                .map(entity_row_fields_snapshot)
2673                .filter(|snap| !snap.is_empty())
2674                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2675        })
2676        .collect()
2677}
2678
2679fn graph_insert_returning_snapshots(
2680    store: &crate::storage::unified::UnifiedStore,
2681    collection: &str,
2682    ids: &[EntityId],
2683) -> Vec<Vec<(String, Value)>> {
2684    let Some(manager) = store.get_collection(collection) else {
2685        return Vec::new();
2686    };
2687
2688    ids.iter()
2689        .filter_map(|id| manager.get(*id))
2690        .filter_map(|entity| {
2691            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2692            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2693            Some(record)
2694        })
2695        .map(|record| {
2696            record
2697                .iter_fields()
2698                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2699                .collect()
2700        })
2701        .collect()
2702}
2703
2704fn graph_update_returning_snapshots(
2705    runtime: &RedDBRuntime,
2706    collection: &str,
2707    ids: &[EntityId],
2708) -> Vec<Vec<(String, Value)>> {
2709    let store = runtime.db().store();
2710    let Some(manager) = store.get_collection(collection) else {
2711        return Vec::new();
2712    };
2713
2714    manager
2715        .get_many(ids)
2716        .into_iter()
2717        .flatten()
2718        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2719        .map(|record| {
2720            record
2721                .iter_fields()
2722                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2723                .collect()
2724        })
2725        .collect()
2726}
2727
2728fn ensure_update_target_contract(
2729    runtime: &RedDBRuntime,
2730    collection: &str,
2731    target: UpdateTarget,
2732) -> RedDBResult<()> {
2733    let Some(contract) = runtime.db().collection_contract(collection) else {
2734        return Ok(());
2735    };
2736    if update_target_contract_is_advisory(&contract)
2737        || update_target_allows_model(contract.declared_model, update_target_model(target))
2738    {
2739        return Ok(());
2740    }
2741    Err(RedDBError::InvalidOperation(format!(
2742        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2743        collection,
2744        update_model_name(contract.declared_model),
2745        update_model_name(update_target_model(target))
2746    )))
2747}
2748
2749fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2750    matches!(
2751        (&contract.origin, &contract.schema_mode),
2752        (
2753            crate::physical::ContractOrigin::Implicit,
2754            crate::catalog::SchemaMode::Dynamic,
2755        )
2756    )
2757}
2758
2759fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2760    match target {
2761        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2762        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2763        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2764        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2765    }
2766}
2767
2768fn update_target_allows_model(
2769    declared_model: crate::catalog::CollectionModel,
2770    requested_model: crate::catalog::CollectionModel,
2771) -> bool {
2772    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2773}
2774
2775fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2776    match model {
2777        crate::catalog::CollectionModel::Table => "table",
2778        crate::catalog::CollectionModel::Document => "document",
2779        crate::catalog::CollectionModel::Graph => "graph",
2780        crate::catalog::CollectionModel::Vector => "vector",
2781        crate::catalog::CollectionModel::Hll => "hll",
2782        crate::catalog::CollectionModel::Sketch => "sketch",
2783        crate::catalog::CollectionModel::Filter => "filter",
2784        crate::catalog::CollectionModel::Kv => "kv",
2785        crate::catalog::CollectionModel::Config => "config",
2786        crate::catalog::CollectionModel::Vault => "vault",
2787        crate::catalog::CollectionModel::Mixed => "mixed",
2788        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2789        crate::catalog::CollectionModel::Queue => "queue",
2790        crate::catalog::CollectionModel::Metrics => "metrics",
2791    }
2792}
2793
2794fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2795    let db = runtime.db();
2796    if let Some(contract) = db.collection_contract(collection) {
2797        let advisory_implicit_dynamic = matches!(
2798            (&contract.origin, &contract.schema_mode),
2799            (
2800                crate::physical::ContractOrigin::Implicit,
2801                crate::catalog::SchemaMode::Dynamic,
2802            )
2803        );
2804        if advisory_implicit_dynamic
2805            || matches!(
2806                contract.declared_model,
2807                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2808            )
2809        {
2810            return Ok(());
2811        }
2812        return Err(RedDBError::InvalidOperation(format!(
2813            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2814            collection, contract.declared_model
2815        )));
2816    }
2817
2818    let now = std::time::SystemTime::now()
2819        .duration_since(std::time::UNIX_EPOCH)
2820        .unwrap_or_default()
2821        .as_millis();
2822    db.save_collection_contract(crate::physical::CollectionContract {
2823        name: collection.to_string(),
2824        declared_model: crate::catalog::CollectionModel::Graph,
2825        schema_mode: crate::catalog::SchemaMode::Dynamic,
2826        origin: crate::physical::ContractOrigin::Implicit,
2827        version: 1,
2828        created_at_unix_ms: now,
2829        updated_at_unix_ms: now,
2830        default_ttl_ms: db.collection_default_ttl_ms(collection),
2831        vector_dimension: None,
2832        vector_metric: None,
2833        context_index_fields: Vec::new(),
2834        declared_columns: Vec::new(),
2835        table_def: None,
2836        timestamps_enabled: false,
2837        context_index_enabled: false,
2838        metrics_raw_retention_ms: None,
2839        metrics_rollup_policies: Vec::new(),
2840        metrics_tenant_identity: None,
2841        metrics_namespace: None,
2842        append_only: false,
2843        subscriptions: Vec::new(),
2844        analytics_config: Vec::new(),
2845        session_key: None,
2846        session_gap_ms: None,
2847        retention_duration_ms: None,
2848        analytical_storage: None,
2849
2850        ai_policy: None,
2851    })
2852    .map(|_| ())
2853    .map_err(|err| RedDBError::Internal(err.to_string()))
2854}
2855
2856fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2857    query
2858        .assignment_exprs
2859        .iter()
2860        .enumerate()
2861        .any(|(idx, (column, expr))| {
2862            query
2863                .compound_assignment_ops
2864                .get(idx)
2865                .is_some_and(|op| op.is_some())
2866                || expr_references_update_column(expr, &query.table, column)
2867        })
2868}
2869
2870fn evaluate_compound_update_assignment(
2871    column: &str,
2872    record: &UnifiedRecord,
2873    op: BinOp,
2874    rhs: Value,
2875) -> RedDBResult<Value> {
2876    let lhs = record.get(column).ok_or_else(|| {
2877        RedDBError::Query(format!(
2878            "compound assignment requires existing numeric field '{column}'"
2879        ))
2880    })?;
2881    if matches!(lhs, Value::Null) {
2882        return Err(RedDBError::Query(format!(
2883            "compound assignment requires non-null numeric field '{column}'"
2884        )));
2885    }
2886    apply_compound_numeric_op(column, op, lhs, &rhs)
2887}
2888
2889fn apply_compound_numeric_op(
2890    column: &str,
2891    op: BinOp,
2892    lhs: &Value,
2893    rhs: &Value,
2894) -> RedDBResult<Value> {
2895    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2896        return Err(RedDBError::Query(format!(
2897            "compound assignment requires numeric field '{column}'"
2898        )));
2899    };
2900    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2901        return Err(RedDBError::Query(format!(
2902            "compound assignment requires numeric right-hand value for field '{column}'"
2903        )));
2904    };
2905
2906    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2907        let a = lhs_number.as_f64();
2908        let b = rhs_number.as_f64();
2909        let out = match op {
2910            BinOp::Add => a + b,
2911            BinOp::Sub => a - b,
2912            BinOp::Mul => a * b,
2913            BinOp::Div => {
2914                if b == 0.0 {
2915                    return Err(RedDBError::Query(format!(
2916                        "division by zero in compound assignment for field '{column}'"
2917                    )));
2918                }
2919                a / b
2920            }
2921            BinOp::Mod => {
2922                if b == 0.0 {
2923                    return Err(RedDBError::Query(format!(
2924                        "modulo by zero in compound assignment for field '{column}'"
2925                    )));
2926                }
2927                a % b
2928            }
2929            _ => {
2930                return Err(RedDBError::Query(format!(
2931                    "unsupported compound assignment operator for field '{column}'"
2932                )));
2933            }
2934        };
2935        if !out.is_finite() {
2936            return Err(RedDBError::Query(format!(
2937                "numeric overflow in compound assignment for field '{column}'"
2938            )));
2939        }
2940        return Ok(Value::Float(out));
2941    }
2942
2943    let a = lhs_number.as_i128();
2944    let b = rhs_number.as_i128();
2945    let out = match op {
2946        BinOp::Add => a.checked_add(b),
2947        BinOp::Sub => a.checked_sub(b),
2948        BinOp::Mul => a.checked_mul(b),
2949        BinOp::Mod => {
2950            if b == 0 {
2951                return Err(RedDBError::Query(format!(
2952                    "modulo by zero in compound assignment for field '{column}'"
2953                )));
2954            }
2955            a.checked_rem(b)
2956        }
2957        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2958        _ => None,
2959    }
2960    .ok_or_else(|| {
2961        RedDBError::Query(format!(
2962            "numeric overflow in compound assignment for field '{column}'"
2963        ))
2964    })?;
2965
2966    if matches!(lhs, Value::UnsignedInteger(_)) {
2967        let value = u64::try_from(out).map_err(|_| {
2968            RedDBError::Query(format!(
2969                "numeric overflow in compound assignment for field '{column}'"
2970            ))
2971        })?;
2972        Ok(Value::UnsignedInteger(value))
2973    } else {
2974        let value = i64::try_from(out).map_err(|_| {
2975            RedDBError::Query(format!(
2976                "numeric overflow in compound assignment for field '{column}'"
2977            ))
2978        })?;
2979        Ok(Value::Integer(value))
2980    }
2981}
2982
2983#[derive(Clone, Copy)]
2984enum CompoundNumber {
2985    Integer(i128),
2986    Float(f64),
2987}
2988
2989impl CompoundNumber {
2990    fn from_value(value: &Value) -> Option<Self> {
2991        match value {
2992            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2993            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2994            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2995            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2996            _ => None,
2997        }
2998    }
2999
3000    fn is_float(self) -> bool {
3001        matches!(self, Self::Float(_))
3002    }
3003
3004    fn as_f64(self) -> f64 {
3005        match self {
3006            Self::Integer(value) => value as f64,
3007            Self::Float(value) => value,
3008        }
3009    }
3010
3011    fn as_i128(self) -> i128 {
3012        match self {
3013            Self::Integer(value) => value,
3014            Self::Float(_) => unreachable!("float compound number used as integer"),
3015        }
3016    }
3017}
3018
3019fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3020    match expr {
3021        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3022        Expr::Column { field, .. } => {
3023            field_ref_matches_update_column(field, table_name, target_column)
3024        }
3025        Expr::BinaryOp { lhs, rhs, .. } => {
3026            expr_references_update_column(lhs, table_name, target_column)
3027                || expr_references_update_column(rhs, table_name, target_column)
3028        }
3029        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3030            expr_references_update_column(operand, table_name, target_column)
3031        }
3032        Expr::FunctionCall { args, .. } => args
3033            .iter()
3034            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3035        Expr::Case {
3036            branches, else_, ..
3037        } => {
3038            branches.iter().any(|(cond, value)| {
3039                expr_references_update_column(cond, table_name, target_column)
3040                    || expr_references_update_column(value, table_name, target_column)
3041            }) || else_
3042                .as_deref()
3043                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3044        }
3045        Expr::IsNull { operand, .. } => {
3046            expr_references_update_column(operand, table_name, target_column)
3047        }
3048        Expr::InList { target, values, .. } => {
3049            expr_references_update_column(target, table_name, target_column)
3050                || values
3051                    .iter()
3052                    .any(|value| expr_references_update_column(value, table_name, target_column))
3053        }
3054        Expr::Between {
3055            target, low, high, ..
3056        } => {
3057            expr_references_update_column(target, table_name, target_column)
3058                || expr_references_update_column(low, table_name, target_column)
3059                || expr_references_update_column(high, table_name, target_column)
3060        }
3061        Expr::WindowFunctionCall { args, window, .. } => {
3062            args.iter()
3063                .any(|arg| expr_references_update_column(arg, table_name, target_column))
3064                || window
3065                    .partition_by
3066                    .iter()
3067                    .any(|e| expr_references_update_column(e, table_name, target_column))
3068                || window
3069                    .order_by
3070                    .iter()
3071                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3072        }
3073    }
3074}
3075
3076fn field_ref_matches_update_column(
3077    field: &FieldRef,
3078    table_name: &str,
3079    target_column: &str,
3080) -> bool {
3081    match field {
3082        FieldRef::TableColumn { table, column } => {
3083            column.eq_ignore_ascii_case(target_column)
3084                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3085        }
3086        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3087            false
3088        }
3089    }
3090}
3091
3092fn resolve_update_entity_by_logical_id(
3093    runtime: &RedDBRuntime,
3094    table: &str,
3095    logical_id: EntityId,
3096) -> Option<UnifiedEntity> {
3097    let store = runtime.inner.db.store();
3098    if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3099        return Some(entity);
3100    }
3101    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
3102    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
3103    store.get(table, logical_id)
3104}
3105
3106fn update_cdc_item_kind(
3107    runtime: &RedDBRuntime,
3108    collection: &str,
3109    entity: &UnifiedEntity,
3110) -> &'static str {
3111    match &entity.data {
3112        EntityData::Node(_) => return "node",
3113        EntityData::Edge(_) => return "edge",
3114        _ => {}
3115    }
3116
3117    match runtime
3118        .db()
3119        .collection_contract(collection)
3120        .map(|contract| contract.declared_model)
3121    {
3122        Some(crate::catalog::CollectionModel::Document) => "document",
3123        Some(crate::catalog::CollectionModel::Kv)
3124        | Some(crate::catalog::CollectionModel::Vault) => "kv",
3125        _ => "row",
3126    }
3127}
3128
3129fn ordered_update_target_ids(
3130    manager: &Arc<crate::storage::SegmentManager>,
3131    entity_ids: &[EntityId],
3132    order_by: &[OrderByClause],
3133    limit: Option<usize>,
3134) -> Vec<EntityId> {
3135    let mut entities: Vec<UnifiedEntity> =
3136        manager.get_many(entity_ids).into_iter().flatten().collect();
3137    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3138    if let Some(limit) = limit {
3139        entities.truncate(limit);
3140    }
3141    entities.into_iter().map(|entity| entity.id).collect()
3142}
3143
3144fn compare_update_order(
3145    left: &UnifiedEntity,
3146    right: &UnifiedEntity,
3147    order_by: &[OrderByClause],
3148) -> Ordering {
3149    for clause in order_by {
3150        let left_value = update_order_value(left, &clause.field);
3151        let right_value = update_order_value(right, &clause.field);
3152        let ordering = compare_update_order_values(
3153            left_value.as_ref(),
3154            right_value.as_ref(),
3155            clause.nulls_first,
3156        );
3157        if ordering != Ordering::Equal {
3158            return if clause.ascending {
3159                ordering
3160            } else {
3161                ordering.reverse()
3162            };
3163        }
3164    }
3165    left.logical_id().raw().cmp(&right.logical_id().raw())
3166}
3167
3168fn compare_update_order_values(
3169    left: Option<&Value>,
3170    right: Option<&Value>,
3171    nulls_first: bool,
3172) -> Ordering {
3173    match (left, right) {
3174        (None, None) => Ordering::Equal,
3175        (None, Some(_)) => {
3176            if nulls_first {
3177                Ordering::Less
3178            } else {
3179                Ordering::Greater
3180            }
3181        }
3182        (Some(_), None) => {
3183            if nulls_first {
3184                Ordering::Greater
3185            } else {
3186                Ordering::Less
3187            }
3188        }
3189        (Some(left), Some(right)) => {
3190            crate::storage::query::value_compare::total_compare_values(left, right)
3191        }
3192    }
3193}
3194
3195fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3196    let FieldRef::TableColumn { table, column } = field else {
3197        return None;
3198    };
3199    if !table.is_empty() {
3200        return None;
3201    }
3202    if column.eq_ignore_ascii_case("rid") {
3203        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3204    }
3205    match &entity.data {
3206        EntityData::Row(row) => row.get_field(column).cloned(),
3207        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3208            .and_then(|record| record.get(column).cloned()),
3209        _ => None,
3210    }
3211}
3212
3213fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3214    if columns.is_empty() {
3215        return columns;
3216    }
3217
3218    let mut unique = Vec::with_capacity(columns.len());
3219    for column in columns.drain(..) {
3220        if !unique
3221            .iter()
3222            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3223        {
3224            unique.push(column);
3225        }
3226    }
3227    unique
3228}
3229
3230// =============================================================================
3231// Helper functions for extracting typed values from column/value pairs
3232// =============================================================================
3233
3234const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3235
3236fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3237    if column.eq_ignore_ascii_case("_ttl") {
3238        Some(SQL_TTL_METADATA_COLUMNS[0])
3239    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3240        Some(SQL_TTL_METADATA_COLUMNS[1])
3241    } else if column.eq_ignore_ascii_case("_expires_at") {
3242        Some(SQL_TTL_METADATA_COLUMNS[2])
3243    } else {
3244        None
3245    }
3246}
3247
3248/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3249/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3250/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3251/// `_ttl_ms` and `_expires_at` are passed through.
3252fn canonicalize_sql_ttl_metadata(
3253    key: &'static str,
3254    value: MetadataValue,
3255) -> (&'static str, MetadataValue) {
3256    if key != "_ttl" {
3257        return (key, value);
3258    }
3259    let scaled = match value {
3260        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3261        MetadataValue::Timestamp(ms_or_s) => {
3262            // Timestamp is already chosen for very large values; treat as
3263            // already-ms to avoid silent overflow.
3264            MetadataValue::Timestamp(ms_or_s)
3265        }
3266        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3267        other => other,
3268    };
3269    ("_ttl_ms", scaled)
3270}
3271
3272/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3273/// `SECRET('...')` literals. The runtime strips this marker and
3274/// applies the actual crypto transform during INSERT execution.
3275pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3276
3277impl RedDBRuntime {
3278    /// Strip the plaintext sentinel from a `Value::Password` or
3279    /// `Value::Secret` produced by the parser and apply the real
3280    /// crypto transform. `Password` is always hashed with argon2id.
3281    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3282    /// when `red.config.secret.auto_encrypt = true` (default).
3283    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3284        match value {
3285            Value::Password(marked) => {
3286                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3287                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3288                } else {
3289                    Ok(Value::Password(marked))
3290                }
3291            }
3292            Value::Secret(bytes) => {
3293                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3294                    if !self.secret_auto_encrypt() {
3295                        return Err(RedDBError::Query(
3296                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3297                             is false. Insert pre-encrypted bytes directly instead."
3298                                .to_string(),
3299                        ));
3300                    }
3301                    let key = self.secret_aes_key().ok_or_else(|| {
3302                        RedDBError::Query(
3303                            "SECRET() column encryption requires a bootstrapped \
3304                             vault (red.secret.aes_key is missing). Start the server \
3305                             with --vault to enable."
3306                                .to_string(),
3307                        )
3308                    })?;
3309                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3310                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3311                } else {
3312                    Ok(Value::Secret(bytes))
3313                }
3314            }
3315            other => Ok(other),
3316        }
3317    }
3318}
3319
3320/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3321/// This is the on-disk representation of `Value::Secret`.
3322fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3323    let nonce_bytes = crate::auth::store::random_bytes(12);
3324    let mut nonce = [0u8; 12];
3325    nonce.copy_from_slice(&nonce_bytes[..12]);
3326    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3327    let mut out = Vec::with_capacity(12 + ct.len());
3328    out.extend_from_slice(&nonce);
3329    out.extend_from_slice(&ct);
3330    out
3331}
3332
3333/// Decode a `Value::Secret` payload back to plaintext. Returns
3334/// `None` when the payload is too short or AES-GCM authentication
3335/// fails (tampered or wrong key).
3336pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3337    if payload.len() < 12 {
3338        return None;
3339    }
3340    let mut nonce = [0u8; 12];
3341    nonce.copy_from_slice(&payload[..12]);
3342    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3343}
3344
3345fn split_insert_metadata(
3346    runtime: &RedDBRuntime,
3347    columns: &[String],
3348    values: &[Value],
3349) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3350    let mut fields = Vec::new();
3351    let mut metadata = Vec::new();
3352
3353    for (column, value) in columns.iter().zip(values.iter()) {
3354        // Still support legacy _ttl columns for backward compat
3355        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3356            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3357            let (canonical_key, canonical_value) =
3358                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3359            metadata.push((canonical_key.to_string(), canonical_value));
3360            continue;
3361        }
3362        fields.push((
3363            column.clone(),
3364            runtime.resolve_crypto_sentinel(value.clone())?,
3365        ));
3366    }
3367
3368    Ok((fields, metadata))
3369}
3370
3371/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3372fn merge_with_clauses(
3373    metadata: &mut Vec<(String, MetadataValue)>,
3374    ttl_ms: Option<u64>,
3375    expires_at_ms: Option<u64>,
3376    with_metadata: &[(String, Value)],
3377) {
3378    if let Some(ms) = ttl_ms {
3379        metadata.push((
3380            "_ttl_ms".to_string(),
3381            if ms <= i64::MAX as u64 {
3382                MetadataValue::Int(ms as i64)
3383            } else {
3384                MetadataValue::Timestamp(ms)
3385            },
3386        ));
3387    }
3388    if let Some(ms) = expires_at_ms {
3389        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3390    }
3391    for (key, value) in with_metadata {
3392        let meta_value = match value {
3393            Value::Text(s) => MetadataValue::String(s.to_string()),
3394            Value::Integer(n) => MetadataValue::Int(*n),
3395            Value::Float(n) => MetadataValue::Float(*n),
3396            Value::Boolean(b) => MetadataValue::Bool(*b),
3397            _ => MetadataValue::String(value.to_string()),
3398        };
3399        metadata.push((key.clone(), meta_value));
3400    }
3401}
3402
3403fn merge_vector_metadata_column(
3404    metadata: &mut Vec<(String, MetadataValue)>,
3405    columns: &[String],
3406    values: &[Value],
3407) -> RedDBResult<()> {
3408    let Some(value) = columns
3409        .iter()
3410        .position(|column| column.eq_ignore_ascii_case("metadata"))
3411        .map(|index| &values[index])
3412    else {
3413        return Ok(());
3414    };
3415    let json = match value {
3416        Value::Null => return Ok(()),
3417        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3418            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3419        })?,
3420        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3421            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3422        })?,
3423        other => {
3424            return Err(RedDBError::Query(format!(
3425                "column 'metadata' expected JSON object, got {other:?}"
3426            )))
3427        }
3428    };
3429    let parsed = metadata_from_json(&json)?;
3430    for (key, value) in parsed.iter() {
3431        metadata.push((key.clone(), value.clone()));
3432    }
3433    Ok(())
3434}
3435
3436fn apply_collection_default_ttl_metadata(
3437    runtime: &RedDBRuntime,
3438    collection: &str,
3439    metadata: &mut Vec<(String, MetadataValue)>,
3440) {
3441    if has_internal_ttl_metadata(metadata) {
3442        return;
3443    }
3444
3445    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3446        return;
3447    };
3448
3449    metadata.push((
3450        "_ttl_ms".to_string(),
3451        if default_ttl_ms <= i64::MAX as u64 {
3452            MetadataValue::Int(default_ttl_ms as i64)
3453        } else {
3454            MetadataValue::Timestamp(default_ttl_ms)
3455        },
3456    ));
3457}
3458
3459fn ensure_non_tree_reserved_metadata_entries(
3460    metadata: &[(String, MetadataValue)],
3461) -> RedDBResult<()> {
3462    for (key, _) in metadata {
3463        ensure_non_tree_reserved_metadata_key(key)?;
3464    }
3465    Ok(())
3466}
3467
3468fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3469    if key.starts_with(TREE_METADATA_PREFIX) {
3470        return Err(RedDBError::Query(format!(
3471            "metadata key '{}' is reserved for managed trees",
3472            key
3473        )));
3474    }
3475    Ok(())
3476}
3477
3478fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3479    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3480        return Err(RedDBError::Query(format!(
3481            "edge label '{}' is reserved for managed trees",
3482            TREE_CHILD_EDGE_LABEL
3483        )));
3484    }
3485    Ok(())
3486}
3487
3488fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3489    let mut columns = Vec::with_capacity(pairs.len());
3490    let mut values = Vec::with_capacity(pairs.len());
3491
3492    for (column, value) in pairs {
3493        columns.push(column.clone());
3494        values.push(value.clone());
3495    }
3496
3497    (columns, values)
3498}
3499
3500/// Find a required column value and return it as-is.
3501fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3502    for (i, col) in columns.iter().enumerate() {
3503        if col.eq_ignore_ascii_case(name) {
3504            return Ok(values[i].clone());
3505        }
3506    }
3507    Err(RedDBError::Query(format!(
3508        "required column '{name}' not found in INSERT"
3509    )))
3510}
3511
3512/// Find a required column value and coerce to String.
3513fn find_column_value_string(
3514    columns: &[String],
3515    values: &[Value],
3516    name: &str,
3517) -> RedDBResult<String> {
3518    let val = find_column_value(columns, values, name)?;
3519    match val {
3520        Value::Text(s) => Ok(s.to_string()),
3521        Value::Integer(n) => Ok(n.to_string()),
3522        Value::Float(n) => Ok(n.to_string()),
3523        other => Err(RedDBError::Query(format!(
3524            "column '{name}' expected text, got {other:?}"
3525        ))),
3526    }
3527}
3528
3529fn find_document_body_json(
3530    columns: &[String],
3531    values: &[Value],
3532) -> RedDBResult<crate::json::Value> {
3533    let val = find_column_value(columns, values, "body")?;
3534    match val {
3535        Value::Json(bytes) | Value::Blob(bytes) => crate::json::from_slice(&bytes)
3536            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3537        Value::Text(text) => crate::json::from_str(text.as_ref())
3538            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3539        Value::Integer(value) => crate::json::from_str(&value.to_string())
3540            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3541        Value::UnsignedInteger(value) => crate::json::from_str(&value.to_string())
3542            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3543        Value::Float(value) => crate::json::from_str(&value.to_string())
3544            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3545        other => Err(RedDBError::Query(format!(
3546            "column 'body' expected JSON body, got {other:?}"
3547        ))),
3548    }
3549}
3550
3551fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3552    let val = find_column_value(columns, values, name)?;
3553    match val {
3554        Value::Float(n) => Ok(n),
3555        Value::Integer(n) => Ok(n as f64),
3556        Value::UnsignedInteger(n) => Ok(n as f64),
3557        Value::Text(s) => s
3558            .parse::<f64>()
3559            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3560        other => Err(RedDBError::Query(format!(
3561            "column '{name}' expected number, got {other:?}"
3562        ))),
3563    }
3564}
3565
3566/// Find an optional column value as String.
3567fn find_column_value_opt_string(
3568    columns: &[String],
3569    values: &[Value],
3570    name: &str,
3571) -> Option<String> {
3572    for (i, col) in columns.iter().enumerate() {
3573        if col.eq_ignore_ascii_case(name) {
3574            return match &values[i] {
3575                Value::Null => None,
3576                Value::Text(s) => Some(s.to_string()),
3577                Value::Integer(n) => Some(n.to_string()),
3578                Value::Float(n) => Some(n.to_string()),
3579                _ => None,
3580            };
3581        }
3582    }
3583    None
3584}
3585
3586/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3587///
3588/// Accepts integer literals, decimal strings, and node labels resolved via
3589/// the per-collection graph label index (same source of truth that
3590/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3591/// labels error so callers can fall back to the numeric id form.
3592fn resolve_edge_endpoint(
3593    store: &crate::storage::unified::UnifiedStore,
3594    collection: &str,
3595    columns: &[String],
3596    values: &[Value],
3597    name: &str,
3598) -> RedDBResult<u64> {
3599    let val = find_column_value(columns, values, name)?;
3600    match val {
3601        Value::Integer(n) => Ok(n as u64),
3602        Value::UnsignedInteger(n) => Ok(n),
3603        Value::Text(s) => {
3604            if let Ok(n) = s.parse::<u64>() {
3605                return Ok(n);
3606            }
3607            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3608            match matches.len() {
3609                0 => Err(RedDBError::Query(format!(
3610                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3611                ))),
3612                1 => Ok(matches[0].raw()),
3613                n => Err(RedDBError::Query(format!(
3614                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3615                ))),
3616            }
3617        }
3618        other => Err(RedDBError::Query(format!(
3619            "column '{name}' expected integer or node label, got {other:?}"
3620        ))),
3621    }
3622}
3623
3624fn resolve_edge_endpoint_any(
3625    store: &crate::storage::unified::UnifiedStore,
3626    collection: &str,
3627    columns: &[String],
3628    values: &[Value],
3629    names: &[&str],
3630) -> RedDBResult<u64> {
3631    for name in names {
3632        if columns
3633            .iter()
3634            .any(|column| column.eq_ignore_ascii_case(name))
3635        {
3636            return resolve_edge_endpoint(store, collection, columns, values, name);
3637        }
3638    }
3639
3640    Err(RedDBError::Query(format!(
3641        "required column '{}' not found in INSERT",
3642        names.first().copied().unwrap_or("from_rid")
3643    )))
3644}
3645
3646/// Find a required column value and coerce to u64.
3647fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3648    let val = find_column_value(columns, values, name)?;
3649    match val {
3650        Value::Integer(n) => Ok(n as u64),
3651        Value::UnsignedInteger(n) => Ok(n),
3652        Value::Text(s) => s
3653            .parse::<u64>()
3654            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3655        other => Err(RedDBError::Query(format!(
3656            "column '{name}' expected integer, got {other:?}"
3657        ))),
3658    }
3659}
3660
3661/// Find an optional column value as f32.
3662fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3663    for (i, col) in columns.iter().enumerate() {
3664        if col.eq_ignore_ascii_case(name) {
3665            return match &values[i] {
3666                Value::Float(n) => Some(*n as f32),
3667                Value::Integer(n) => Some(*n as f32),
3668                Value::Null => None,
3669                _ => None,
3670            };
3671        }
3672    }
3673    None
3674}
3675
3676/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3677fn find_column_value_vec_f32(
3678    columns: &[String],
3679    values: &[Value],
3680    name: &str,
3681) -> RedDBResult<Vec<f32>> {
3682    let val = find_column_value(columns, values, name)?;
3683    match val {
3684        Value::Vector(v) => Ok(v),
3685        Value::Json(bytes) => {
3686            // Try to parse as JSON array of numbers
3687            let s = std::str::from_utf8(&bytes).map_err(|_| {
3688                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3689            })?;
3690            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3691                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3692            })?;
3693            Ok(arr)
3694        }
3695        other => Err(RedDBError::Query(format!(
3696            "column '{name}' expected vector, got {other:?}"
3697        ))),
3698    }
3699}
3700
3701fn find_column_value_vec_f32_any(
3702    columns: &[String],
3703    values: &[Value],
3704    names: &[&str],
3705) -> RedDBResult<Vec<f32>> {
3706    for name in names {
3707        if columns
3708            .iter()
3709            .any(|column| column.eq_ignore_ascii_case(name))
3710        {
3711            return find_column_value_vec_f32(columns, values, name);
3712        }
3713    }
3714    Err(RedDBError::Query(format!(
3715        "required vector column '{}' not found in INSERT",
3716        names.join("' or '")
3717    )))
3718}
3719
3720/// Extract remaining properties (all columns not in the exclusion list).
3721fn extract_remaining_properties(
3722    columns: &[String],
3723    values: &[Value],
3724    exclude: &[&str],
3725) -> Vec<(String, Value)> {
3726    columns
3727        .iter()
3728        .zip(values.iter())
3729        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3730        .map(|(col, val)| (col.clone(), val.clone()))
3731        .collect()
3732}
3733
3734fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3735    let mut invalid = Vec::new();
3736    for column in columns {
3737        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3738            invalid.push(column.clone());
3739        }
3740    }
3741
3742    if invalid.is_empty() {
3743        Ok(())
3744    } else {
3745        Err(RedDBError::Query(format!(
3746            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3747            invalid.join(", ")
3748        )))
3749    }
3750}
3751
3752fn is_timeseries_insert_column(column: &str) -> bool {
3753    matches!(
3754        column.to_ascii_lowercase().as_str(),
3755        "metric"
3756            | "value"
3757            | "tags"
3758            | "timestamp"
3759            | "timestamp_ns"
3760            | "time"
3761            // Analytics-event extension (#577): an analytics row carries
3762            // an `event_name` + JSON `payload`. The payload is validated
3763            // against the AnalyticsSchemaRegistry inside
3764            // `insert_timeseries_point` before the row lands.
3765            | "event_name"
3766            | "payload"
3767    )
3768}
3769
3770fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3771    let mut found = None;
3772
3773    for alias in ["timestamp_ns", "timestamp", "time"] {
3774        for (index, column) in columns.iter().enumerate() {
3775            if !column.eq_ignore_ascii_case(alias) {
3776                continue;
3777            }
3778
3779            if found.is_some() {
3780                return Err(RedDBError::Query(
3781                    "timeseries INSERT accepts only one timestamp column".to_string(),
3782                ));
3783            }
3784
3785            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3786        }
3787    }
3788
3789    Ok(found)
3790}
3791
3792fn find_timeseries_tags(
3793    columns: &[String],
3794    values: &[Value],
3795) -> RedDBResult<std::collections::HashMap<String, String>> {
3796    for (index, column) in columns.iter().enumerate() {
3797        if column.eq_ignore_ascii_case("tags") {
3798            return parse_timeseries_tags(&values[index]);
3799        }
3800    }
3801    Ok(std::collections::HashMap::new())
3802}
3803
3804fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3805    match value {
3806        Value::Null => Ok(std::collections::HashMap::new()),
3807        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3808        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3809        other => Err(RedDBError::Query(format!(
3810            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3811        ))),
3812    }
3813}
3814
3815fn parse_timeseries_tags_json(
3816    bytes: &[u8],
3817) -> RedDBResult<std::collections::HashMap<String, String>> {
3818    let json: crate::json::Value = crate::json::from_slice(bytes)
3819        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3820
3821    let object = match json {
3822        crate::json::Value::Object(object) => object,
3823        other => {
3824            return Err(RedDBError::Query(format!(
3825                "timeseries tags must be a JSON object, got {other:?}"
3826            )))
3827        }
3828    };
3829
3830    let mut tags = std::collections::HashMap::with_capacity(object.len());
3831    for (key, value) in object {
3832        tags.insert(key, json_tag_value_to_string(&value));
3833    }
3834    Ok(tags)
3835}
3836
3837/// Encode a tag value for storage so the original JSON type can be
3838/// recovered on read (issue #543).
3839///
3840/// Time-series tags are stored as `HashMap<String, String>` on the
3841/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3842/// the segment codec, WAL and gRPC mirrors don't need a new value
3843/// variant. To preserve the original JSON type across that
3844/// string-only channel we prepend the
3845/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3846/// and serialize the value as compact JSON text. The read paths
3847/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3848/// the marker, parse the suffix, and recover a real JSON value.
3849/// Tags written through other channels (Prometheus remote write,
3850/// metrics handlers, legacy on-disk data) lack the marker and are
3851/// returned as `JsonValue::String(raw)` exactly as before.
3852fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3853    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3854    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3855    buf.push_str(&value.to_string_compact());
3856    buf
3857}
3858
3859fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3860    match value {
3861        Value::UnsignedInteger(value) => Ok(*value),
3862        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3863        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3864        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3865            RedDBError::Query(format!(
3866                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3867            ))
3868        }),
3869        other => Err(RedDBError::Query(format!(
3870            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3871        ))),
3872    }
3873}
3874
3875fn current_unix_ns() -> u64 {
3876    std::time::SystemTime::now()
3877        .duration_since(std::time::UNIX_EPOCH)
3878        .unwrap_or_default()
3879        .as_nanos()
3880        .min(u128::from(u64::MAX)) as u64
3881}
3882
3883fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3884    use crate::json::{Map, Value as JV};
3885    match value {
3886        MetadataValue::Null => JV::Null,
3887        MetadataValue::Bool(value) => JV::Bool(*value),
3888        MetadataValue::Int(value) => JV::Number(*value as f64),
3889        MetadataValue::Float(value) => JV::Number(*value),
3890        MetadataValue::String(value) => JV::String(value.clone()),
3891        MetadataValue::Bytes(value) => JV::Array(
3892            value
3893                .iter()
3894                .map(|value| JV::Number(*value as f64))
3895                .collect(),
3896        ),
3897        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3898        MetadataValue::Array(values) => {
3899            JV::Array(values.iter().map(metadata_value_to_json).collect())
3900        }
3901        MetadataValue::Object(object) => {
3902            let entries = object
3903                .iter()
3904                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3905                .collect();
3906            JV::Object(entries)
3907        }
3908        MetadataValue::Geo { lat, lon } => {
3909            let mut object = Map::new();
3910            object.insert("lat".to_string(), JV::Number(*lat));
3911            object.insert("lon".to_string(), JV::Number(*lon));
3912            JV::Object(object)
3913        }
3914        MetadataValue::Reference(target) => {
3915            let mut object = Map::new();
3916            object.insert(
3917                "collection".to_string(),
3918                JV::String(target.collection().to_string()),
3919            );
3920            object.insert(
3921                "entity_id".to_string(),
3922                JV::Number(target.entity_id().raw() as f64),
3923            );
3924            JV::Object(object)
3925        }
3926        MetadataValue::References(values) => {
3927            let refs = values
3928                .iter()
3929                .map(|target| {
3930                    let mut object = Map::new();
3931                    object.insert(
3932                        "collection".to_string(),
3933                        JV::String(target.collection().to_string()),
3934                    );
3935                    object.insert(
3936                        "entity_id".to_string(),
3937                        JV::Number(target.entity_id().raw() as f64),
3938                    );
3939                    JV::Object(object)
3940                })
3941                .collect();
3942            JV::Array(refs)
3943        }
3944    }
3945}
3946
3947fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3948    match value {
3949        Value::Null => MetadataValue::Null,
3950        Value::Boolean(value) => MetadataValue::Bool(*value),
3951        Value::Integer(value) => MetadataValue::Int(*value),
3952        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3953        Value::Float(value) => MetadataValue::Float(*value),
3954        Value::Text(value) => MetadataValue::String(value.to_string()),
3955        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3956        Value::Timestamp(value) => {
3957            if *value >= 0 {
3958                metadata_u64_to_value(*value as u64)
3959            } else {
3960                MetadataValue::Int(*value)
3961            }
3962        }
3963        Value::TimestampMs(value) => {
3964            if *value >= 0 {
3965                metadata_u64_to_value(*value as u64)
3966            } else {
3967                MetadataValue::Int(*value)
3968            }
3969        }
3970        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3971        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3972        Value::Date(value) => MetadataValue::String(value.to_string()),
3973        Value::Time(value) => MetadataValue::String(value.to_string()),
3974        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3975        Value::Ipv4(value) => MetadataValue::String(format!(
3976            "{}.{}.{}.{}",
3977            (value >> 24) & 0xFF,
3978            (value >> 16) & 0xFF,
3979            (value >> 8) & 0xFF,
3980            value & 0xFF
3981        )),
3982        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3983        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3984        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3985        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3986            lat: *lat as f64 / 1_000_000.0,
3987            lon: *lon as f64 / 1_000_000.0,
3988        },
3989        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3990        Value::TableRef(value) => MetadataValue::String(value.clone()),
3991        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3992        Value::Password(value) => MetadataValue::String(value.clone()),
3993        Value::Array(values) => {
3994            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3995        }
3996        _ => MetadataValue::String(value.to_string()),
3997    }
3998}
3999
4000fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
4001    match value {
4002        Value::Null => Ok(MetadataValue::Null),
4003        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
4004        Value::Integer(_) => Err(RedDBError::Query(format!(
4005            "column '{field}' must be non-negative for TTL metadata"
4006        ))),
4007        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
4008        Value::Float(value) if value.is_finite() => {
4009            if value.fract().abs() >= f64::EPSILON {
4010                return Err(RedDBError::Query(format!(
4011                    "column '{field}' must be an integer (TTL metadata must be an integer)"
4012                )));
4013            }
4014            if *value < 0.0 {
4015                return Err(RedDBError::Query(format!(
4016                    "column '{field}' must be non-negative for TTL metadata"
4017                )));
4018            }
4019            if *value > u64::MAX as f64 {
4020                return Err(RedDBError::Query(format!(
4021                    "column '{field}' value is too large"
4022                )));
4023            }
4024            Ok(metadata_u64_to_value(*value as u64))
4025        }
4026        Value::Float(_) => Err(RedDBError::Query(format!(
4027            "column '{field}' must be a finite number"
4028        ))),
4029        Value::Text(value) => {
4030            let value = value.trim();
4031            if let Ok(value) = value.parse::<u64>() {
4032                Ok(metadata_u64_to_value(value))
4033            } else if let Ok(value) = value.parse::<i64>() {
4034                if value < 0 {
4035                    return Err(RedDBError::Query(format!(
4036                        "column '{field}' must be non-negative for TTL metadata"
4037                    )));
4038                }
4039                Ok(metadata_u64_to_value(value as u64))
4040            } else if let Ok(value) = value.parse::<f64>() {
4041                if !value.is_finite() {
4042                    return Err(RedDBError::Query(format!(
4043                        "column '{field}' must be a finite number"
4044                    )));
4045                }
4046                if value.fract().abs() >= f64::EPSILON {
4047                    return Err(RedDBError::Query(format!(
4048                        "column '{field}' must be an integer (TTL metadata must be an integer)"
4049                    )));
4050                }
4051                if value < 0.0 {
4052                    return Err(RedDBError::Query(format!(
4053                        "column '{field}' must be non-negative for TTL metadata"
4054                    )));
4055                }
4056                if value > u64::MAX as f64 {
4057                    return Err(RedDBError::Query(format!(
4058                        "column '{field}' value is too large"
4059                    )));
4060                }
4061                Ok(metadata_u64_to_value(value as u64))
4062            } else {
4063                Err(RedDBError::Query(format!(
4064                    "column '{field}' expects a numeric value for TTL metadata"
4065                )))
4066            }
4067        }
4068        _ => Err(RedDBError::Query(format!(
4069            "column '{field}' expects a numeric value for TTL metadata"
4070        ))),
4071    }
4072}
4073
4074fn metadata_u64_to_value(value: u64) -> MetadataValue {
4075    if value <= i64::MAX as u64 {
4076        MetadataValue::Int(value as i64)
4077    } else {
4078        MetadataValue::Timestamp(value)
4079    }
4080}
4081
4082/// Phase 2 PG parity: inspect a column value and return `true` when
4083/// the dotted `tail` path is already present under it. Used by the
4084/// tenant auto-fill so rows that already carry an explicit value
4085/// (bulk import, admin insert on behalf of a tenant) are not
4086/// double-stamped with the session's current_tenant().
4087fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4088    let json = match value {
4089        Value::Null => return false,
4090        Value::Json(bytes) | Value::Blob(bytes) => {
4091            match crate::json::from_slice::<crate::json::Value>(bytes) {
4092                Ok(v) => v,
4093                Err(_) => return false,
4094            }
4095        }
4096        Value::Text(s) => {
4097            let trimmed = s.trim_start();
4098            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4099                return false;
4100            }
4101            match crate::json::from_str::<crate::json::Value>(s) {
4102                Ok(v) => v,
4103                Err(_) => return false,
4104            }
4105        }
4106        _ => return false,
4107    };
4108    let mut cursor = &json;
4109    for seg in tail.split('.') {
4110        match cursor {
4111            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4112                Some((_, v)) => cursor = v,
4113                None => return false,
4114            },
4115            _ => return false,
4116        }
4117    }
4118    !matches!(cursor, crate::json::Value::Null)
4119}
4120
4121/// Phase 2 PG parity: take a column value (possibly Null / Text /
4122/// Json) and return a `Value::Json` with the dotted `tail` path set
4123/// to `tenant_id`. Preserves every pre-existing key.
4124///
4125/// Accepts:
4126/// * `Value::Null`  → fresh `{tail: tenant_id}` object
4127/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
4128/// * `Value::text(s)` if `s` is valid JSON → same as Json
4129/// * anything else → error (user supplied a scalar where we need
4130///   a JSON container)
4131fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4132    let mut root = match current {
4133        Value::Null => crate::json::Value::Object(Default::default()),
4134        Value::Json(bytes) | Value::Blob(bytes) => {
4135            crate::json::from_slice(&bytes).map_err(|err| {
4136                RedDBError::Query(format!(
4137                    "tenant auto-fill: root column is not valid JSON ({err})"
4138                ))
4139            })?
4140        }
4141        Value::Text(s) => {
4142            if s.trim().is_empty() {
4143                crate::json::Value::Object(Default::default())
4144            } else {
4145                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4146                    RedDBError::Query(format!(
4147                        "tenant auto-fill: text root is not valid JSON ({err})"
4148                    ))
4149                })?
4150            }
4151        }
4152        other => {
4153            return Err(RedDBError::Query(format!(
4154                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4155            )));
4156        }
4157    };
4158
4159    // Navigate path segments, creating intermediate objects on demand.
4160    let segments: Vec<&str> = tail.split('.').collect();
4161    let mut cursor: &mut crate::json::Value = &mut root;
4162    for (i, seg) in segments.iter().enumerate() {
4163        let is_last = i + 1 == segments.len();
4164        let map = match cursor {
4165            crate::json::Value::Object(m) => m,
4166            _ => {
4167                return Err(RedDBError::Query(format!(
4168                    "tenant auto-fill: segment '{seg}' is not inside an object"
4169                )));
4170            }
4171        };
4172        if is_last {
4173            map.insert(
4174                seg.to_string(),
4175                crate::json::Value::String(tenant_id.to_string()),
4176            );
4177            break;
4178        }
4179        cursor = map
4180            .entry(seg.to_string())
4181            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4182    }
4183
4184    let bytes = crate::json::to_vec(&root).map_err(|err| {
4185        RedDBError::Query(format!(
4186            "tenant auto-fill: failed to re-serialize JSON ({err})"
4187        ))
4188    })?;
4189    Ok(Value::Json(bytes))
4190}
4191
4192#[cfg(test)]
4193mod tests {
4194    use crate::storage::schema::Value;
4195    use crate::storage::wal::{WalReader, WalRecord};
4196    use crate::storage::{DeployProfile, StoragePackaging, StorageProfileSelection};
4197    use crate::{RedDBOptions, RedDBRuntime};
4198    use std::path::Path;
4199
4200    fn persistent_operational_options(path: &Path) -> RedDBOptions {
4201        RedDBOptions::persistent(path)
4202            .with_storage_profile(StorageProfileSelection {
4203                deploy_profile: DeployProfile::Embedded,
4204                packaging: StoragePackaging::OperationalDirectory,
4205                replica_count: 0,
4206                managed_backup: false,
4207                wal_retention: false,
4208            })
4209            .unwrap()
4210    }
4211
4212    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4213        WalReader::open(wal_path)
4214            .expect("wal opens")
4215            .iter()
4216            .map(|record| record.expect("wal record decodes").1)
4217            .filter_map(|record| match record {
4218                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4219                _ => None,
4220            })
4221            .collect()
4222    }
4223
4224    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4225        action
4226            .windows(needle.len())
4227            .any(|window| window == needle.as_bytes())
4228    }
4229
4230    fn assert_statement_writes_collections_in_one_new_wal_batch(
4231        rt: &RedDBRuntime,
4232        wal_path: &Path,
4233        statement: &str,
4234        source: &str,
4235        event_queue: &str,
4236    ) {
4237        let before_batches = store_commit_batches(wal_path).len();
4238
4239        rt.execute_query(statement).unwrap();
4240
4241        let batches = store_commit_batches(wal_path);
4242        let statement_batches = &batches[before_batches..];
4243        let source_batch = statement_batches
4244            .iter()
4245            .position(|actions| {
4246                actions.iter().any(|action| {
4247                    action_contains_text(action, source)
4248                        && !action_contains_text(action, event_queue)
4249                })
4250            })
4251            .expect("source collection write batch is present");
4252        let event_batch = statement_batches
4253            .iter()
4254            .position(|actions| {
4255                actions
4256                    .iter()
4257                    .any(|action| action_contains_text(action, event_queue))
4258            })
4259            .expect("event queue write batch is present");
4260
4261        assert_eq!(
4262            source_batch, event_batch,
4263            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4264        );
4265    }
4266
4267    #[test]
4268    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4269        let dir = tempfile::tempdir().unwrap();
4270        let db_path = dir.path().join("events_dual_write.rdb");
4271        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4272        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4273
4274        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4275            .unwrap();
4276        assert_statement_writes_collections_in_one_new_wal_batch(
4277            &rt,
4278            &wal_path,
4279            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4280            "users",
4281            "users_events",
4282        );
4283    }
4284
4285    #[test]
4286    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4287        let dir = tempfile::tempdir().unwrap();
4288        let db_path = dir.path().join("events_update_atomic.rdb");
4289        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4290        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4291
4292        rt.execute_query(
4293            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4294        )
4295        .unwrap();
4296        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4297            .unwrap();
4298
4299        assert_statement_writes_collections_in_one_new_wal_batch(
4300            &rt,
4301            &wal_path,
4302            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4303            "users",
4304            "user_updates",
4305        );
4306    }
4307
4308    #[test]
4309    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4310        let dir = tempfile::tempdir().unwrap();
4311        let db_path = dir.path().join("events_delete_atomic.rdb");
4312        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4313        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4314
4315        rt.execute_query(
4316            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4317        )
4318        .unwrap();
4319        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4320            .unwrap();
4321
4322        assert_statement_writes_collections_in_one_new_wal_batch(
4323            &rt,
4324            &wal_path,
4325            "DELETE FROM users WHERE id = 1",
4326            "users",
4327            "user_deletes",
4328        );
4329    }
4330
4331    #[test]
4332    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4333        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4334        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4335            .unwrap();
4336        for id in 0..5 {
4337            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4338                .unwrap();
4339        }
4340        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4341            .unwrap();
4342
4343        let updated = rt
4344            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4345            .unwrap();
4346        assert_eq!(updated.affected_rows, 3);
4347
4348        let selected = rt
4349            .execute_query("SELECT id, score FROM users ORDER BY id")
4350            .unwrap();
4351        let scores: Vec<(i64, i64)> = selected
4352            .result
4353            .records
4354            .iter()
4355            .map(|record| {
4356                let id = match record.get("id").unwrap() {
4357                    Value::Integer(value) => *value,
4358                    other => panic!("expected integer id, got {other:?}"),
4359                };
4360                let score = match record.get("score").unwrap() {
4361                    Value::Integer(value) => *value,
4362                    other => panic!("expected integer score, got {other:?}"),
4363                };
4364                (id, score)
4365            })
4366            .collect();
4367        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4368    }
4369
4370    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4371    /// same code path DELETE uses (#51, #52). Exercises the indexed
4372    /// equality fast-path (WHERE id = N with a HASH index), the
4373    /// unindexed range scan (WHERE score > N), and the no-WHERE
4374    /// full-scan branch to confirm the extracted "find target rows"
4375    /// loop preserves affected-row counts and the resulting row state.
4376    #[test]
4377    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4378        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4379        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4380            .unwrap();
4381        for id in 0..5 {
4382            rt.execute_query(&format!(
4383                "INSERT INTO items (id, score) VALUES ({id}, {})",
4384                id * 10
4385            ))
4386            .unwrap();
4387        }
4388        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4389            .unwrap();
4390
4391        // Indexed equality UPDATE — hits the hash fast-path inside
4392        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4393        // below the score>25 cutoff so the next assertion stays clean.
4394        let updated_one = rt
4395            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4396            .unwrap();
4397        assert_eq!(updated_one.affected_rows, 1);
4398
4399        // Unindexed scan UPDATE — bumps everyone with score > 25,
4400        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4401        // zoned/full-scan branch.
4402        let updated_many = rt
4403            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4404            .unwrap();
4405        assert_eq!(updated_many.affected_rows, 2);
4406
4407        let snapshot = rt
4408            .execute_query("SELECT id, score FROM items ORDER BY id")
4409            .unwrap();
4410        let pairs: Vec<(i64, i64)> = snapshot
4411            .result
4412            .records
4413            .iter()
4414            .map(|record| {
4415                let id = match record.get("id").unwrap() {
4416                    Value::Integer(value) => *value,
4417                    other => panic!("expected integer id, got {other:?}"),
4418                };
4419                let score = match record.get("score").unwrap() {
4420                    Value::Integer(value) => *value,
4421                    other => panic!("expected integer score, got {other:?}"),
4422                };
4423                (id, score)
4424            })
4425            .collect();
4426        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4427
4428        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4429        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4430        assert_eq!(updated_all.affected_rows, 5);
4431        let after = rt
4432            .execute_query("SELECT score FROM items ORDER BY id")
4433            .unwrap();
4434        let scores: Vec<i64> = after
4435            .result
4436            .records
4437            .iter()
4438            .map(|record| match record.get("score").unwrap() {
4439                Value::Integer(value) => *value,
4440                other => panic!("expected integer score, got {other:?}"),
4441            })
4442            .collect();
4443        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4444    }
4445
4446    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4447    /// both the index fast-path (WHERE id = N with a HASH index) and
4448    /// the unindexed scan path (WHERE score > N) to confirm the
4449    /// extracted "find target rows" loop preserves the affected-row
4450    /// count and which rows survive.
4451    #[test]
4452    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4453        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4454        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4455            .unwrap();
4456        for id in 0..5 {
4457            rt.execute_query(&format!(
4458                "INSERT INTO items (id, score) VALUES ({id}, {})",
4459                id * 10
4460            ))
4461            .unwrap();
4462        }
4463        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4464            .unwrap();
4465
4466        // Indexed equality DELETE — hits the hash fast-path inside
4467        // DmlTargetScan::find_target_ids.
4468        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4469        assert_eq!(deleted_one.affected_rows, 1);
4470
4471        // Unindexed scan DELETE — drops everyone with score > 25,
4472        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4473        // zoned/full-scan branch.
4474        let deleted_many = rt
4475            .execute_query("DELETE FROM items WHERE score > 25")
4476            .unwrap();
4477        assert_eq!(deleted_many.affected_rows, 2);
4478
4479        let surviving = rt
4480            .execute_query("SELECT id FROM items ORDER BY id")
4481            .unwrap();
4482        let ids: Vec<i64> = surviving
4483            .result
4484            .records
4485            .iter()
4486            .map(|record| match record.get("id").unwrap() {
4487                Value::Integer(value) => *value,
4488                other => panic!("expected integer id, got {other:?}"),
4489            })
4490            .collect();
4491        assert_eq!(ids, vec![0, 1]);
4492
4493        // Sanity: full-scan DELETE with no WHERE clears the rest.
4494        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4495        assert_eq!(deleted_rest.affected_rows, 2);
4496        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4497        assert!(empty.result.records.is_empty());
4498    }
4499
4500    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4501    /// INSERT but reject UPDATE and DELETE with the documented
4502    /// operator-facing error strings. Drives all three DML verbs so
4503    /// the centralized gate is exercised end-to-end.
4504    #[test]
4505    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4506        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4507        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4508            .unwrap();
4509
4510        // INSERT must succeed — APPEND ONLY exists precisely to allow
4511        // appends. The gate should be a no-op for INSERT.
4512        let inserted = rt
4513            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4514            .unwrap();
4515        assert_eq!(inserted.affected_rows, 1);
4516
4517        // UPDATE is rejected with the gate's UPDATE-specific message.
4518        let update_err = rt
4519            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4520            .unwrap_err();
4521        let msg = format!("{update_err}");
4522        assert!(
4523            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4524            "expected UPDATE rejection message, got: {msg}"
4525        );
4526
4527        // DELETE is rejected with the gate's DELETE-specific message.
4528        let delete_err = rt
4529            .execute_query("DELETE FROM events WHERE id = 1")
4530            .unwrap_err();
4531        let msg = format!("{delete_err}");
4532        assert!(
4533            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4534            "expected DELETE rejection message, got: {msg}"
4535        );
4536
4537        // Row should still be present — neither rejected mutation
4538        // touched storage.
4539        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4540        assert_eq!(surviving.result.records.len(), 1);
4541    }
4542
4543    /// CollectionContract gate: tables without an APPEND ONLY contract
4544    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4545    /// is a true pass-through, not an accidental block.
4546    #[test]
4547    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4548        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4549        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4550            .unwrap();
4551
4552        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4553            .unwrap();
4554        let updated = rt
4555            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4556            .unwrap();
4557        assert_eq!(updated.affected_rows, 1);
4558        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4559        assert_eq!(deleted.affected_rows, 1);
4560    }
4561
4562    #[test]
4563    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4564        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4565        rt.execute_query(
4566            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4567        )
4568        .unwrap();
4569
4570        let inserted = rt
4571            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4572            .unwrap();
4573        assert_eq!(inserted.affected_rows, 1);
4574
4575        let events = queue_payloads(&rt, "audit_log");
4576        assert_eq!(events.len(), 1);
4577        let event = events[0].as_object().expect("event payload object");
4578        assert!(event
4579            .get("event_id")
4580            .and_then(crate::json::Value::as_str)
4581            .is_some_and(|value| !value.is_empty()));
4582        assert_eq!(
4583            event.get("op").and_then(crate::json::Value::as_str),
4584            Some("insert")
4585        );
4586        assert_eq!(
4587            event.get("collection").and_then(crate::json::Value::as_str),
4588            Some("users")
4589        );
4590        assert_eq!(
4591            event.get("id").and_then(crate::json::Value::as_u64),
4592            Some(7)
4593        );
4594        assert!(event
4595            .get("ts")
4596            .and_then(crate::json::Value::as_u64)
4597            .is_some());
4598        assert!(event
4599            .get("lsn")
4600            .and_then(crate::json::Value::as_u64)
4601            .is_some());
4602        assert!(matches!(
4603            event.get("tenant"),
4604            Some(crate::json::Value::Null)
4605        ));
4606        assert!(matches!(
4607            event.get("before"),
4608            Some(crate::json::Value::Null)
4609        ));
4610        let after = event
4611            .get("after")
4612            .and_then(crate::json::Value::as_object)
4613            .expect("after object");
4614        assert_eq!(
4615            after.get("id").and_then(crate::json::Value::as_u64),
4616            Some(7)
4617        );
4618        assert_eq!(
4619            after.get("email").and_then(crate::json::Value::as_str),
4620            Some("a@example.com")
4621        );
4622    }
4623
4624    #[test]
4625    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4626        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4627        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4628            .unwrap();
4629
4630        rt.execute_query(
4631            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4632        )
4633        .unwrap();
4634
4635        let events = queue_payloads(&rt, "users_events");
4636        assert_eq!(events.len(), 2);
4637        let mut previous_lsn = 0;
4638        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4639            let object = event.as_object().expect("event payload object");
4640            assert_eq!(
4641                object.get("op").and_then(crate::json::Value::as_str),
4642                Some("insert")
4643            );
4644            assert_eq!(
4645                object.get("id").and_then(crate::json::Value::as_u64),
4646                Some(expected_id)
4647            );
4648            let lsn = object
4649                .get("lsn")
4650                .and_then(crate::json::Value::as_u64)
4651                .expect("event lsn");
4652            assert!(
4653                lsn > previous_lsn,
4654                "event LSNs should increase in row order"
4655            );
4656            previous_lsn = lsn;
4657            let after = object
4658                .get("after")
4659                .and_then(crate::json::Value::as_object)
4660                .expect("after object");
4661            assert_eq!(
4662                after.get("id").and_then(crate::json::Value::as_u64),
4663                Some(expected_id)
4664            );
4665        }
4666    }
4667
4668    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4669        let result = rt
4670            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4671            .expect("peek queue");
4672        result
4673            .result
4674            .records
4675            .iter()
4676            .map(
4677                |record| match record.get("payload").expect("payload column") {
4678                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4679                    other => panic!("expected JSON queue payload, got {other:?}"),
4680                },
4681            )
4682            .collect()
4683    }
4684
4685    // ── #112: auto-index user `id` on first insert ─────────────────────
4686
4687    /// First insert into a fresh collection that carries a column named
4688    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4689    /// populate it transparently, and `WHERE id = N` lookups exercise
4690    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4691    ///
4692    /// This is the load-bearing acceptance test for #112 — without the
4693    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4694    /// fall through to a full segment scan (the 4× perf gap documented
4695    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4696    #[test]
4697    fn auto_index_id_fires_on_first_insert() {
4698        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4699        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4700            .unwrap();
4701
4702        // Pre-condition: no index on `id` yet.
4703        assert!(
4704            rt.index_store_ref()
4705                .find_index_for_column("bench_users", "id")
4706                .is_none(),
4707            "freshly created collection should not have an `id` index"
4708        );
4709
4710        // Single-row INSERT — drives `MutationEngine::append_one`.
4711        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4712            .unwrap();
4713
4714        // Post-condition: hash index registered on `id`.
4715        let registered = rt
4716            .index_store_ref()
4717            .find_index_for_column("bench_users", "id")
4718            .expect("auto-index hook should have registered idx_id on first insert");
4719        assert_eq!(registered.name, "idx_id");
4720        assert_eq!(registered.collection, "bench_users");
4721        assert_eq!(registered.columns, vec!["id".to_string()]);
4722        assert!(matches!(
4723            registered.method,
4724            super::super::index_store::IndexMethodKind::Hash
4725        ));
4726
4727        // Subsequent inserts populate the index; `WHERE id = N` should
4728        // resolve via the hash fast path and round-trip every row.
4729        for id in 2..=5 {
4730            rt.execute_query(&format!(
4731                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4732                id * 10
4733            ))
4734            .unwrap();
4735        }
4736        for id in 1..=5 {
4737            let result = rt
4738                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4739                .unwrap();
4740            assert_eq!(
4741                result.result.records.len(),
4742                1,
4743                "id={id} should match one row"
4744            );
4745        }
4746
4747        // Delete via the hash fast-path — exactly the bench scenario the
4748        // perf doc identified as the 4× regression. With the index
4749        // present, `find_target_ids` short-circuits before
4750        // `for_each_entity_zoned` runs.
4751        let deleted = rt
4752            .execute_query("DELETE FROM bench_users WHERE id = 3")
4753            .unwrap();
4754        assert_eq!(deleted.affected_rows, 1);
4755    }
4756
4757    /// Bulk INSERT (the multi-row VALUES path) drives
4758    /// `MutationEngine::append_batch`. The hook must fire there too —
4759    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4760    /// wire bulk INSERT) skip auto-indexing entirely.
4761    #[test]
4762    fn auto_index_id_fires_on_first_bulk_insert() {
4763        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4764        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4765            .unwrap();
4766
4767        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4768            .unwrap();
4769
4770        let registered = rt
4771            .index_store_ref()
4772            .find_index_for_column("bench_bulk", "id")
4773            .expect("auto-index hook should fire on first bulk insert");
4774        assert_eq!(registered.name, "idx_id");
4775
4776        // Every row populated via `index_entity_insert_batch`.
4777        for id in 1..=3 {
4778            let result = rt
4779                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4780                .unwrap();
4781            assert_eq!(result.result.records.len(), 1);
4782        }
4783    }
4784
4785    /// Hook is a no-op when the row carries no `id` column. Conservative
4786    /// match (case-sensitive `id`) — `Id`, `ID`, and `rid`
4787    /// don't trigger it.
4788    #[test]
4789    fn auto_index_id_skips_when_no_id_column() {
4790        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4791        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4792            .unwrap();
4793        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4794            .unwrap();
4795
4796        assert!(rt
4797            .index_store_ref()
4798            .find_index_for_column("plain", "id")
4799            .is_none());
4800        assert!(rt
4801            .index_store_ref()
4802            .find_index_for_column("plain", "uid")
4803            .is_none());
4804    }
4805
4806    /// Hook only fires once per collection. If an explicit
4807    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4808    /// detects it via `find_index_for_column` and does NOT clobber it
4809    /// with a HASH index on the next insert.
4810    #[test]
4811    fn auto_index_id_skips_when_index_already_exists() {
4812        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4813        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4814            .unwrap();
4815        // User-declared BTREE index on `id` before any insert.
4816        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4817            .unwrap();
4818        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4819            .unwrap();
4820
4821        let registered = rt
4822            .index_store_ref()
4823            .find_index_for_column("pre", "id")
4824            .expect("user index should still be there");
4825        assert_eq!(
4826            registered.name, "user_idx",
4827            "auto-index hook must not overwrite an existing index"
4828        );
4829    }
4830
4831    /// Implicit `idx_id` is reaped when the collection drops. The
4832    /// existing `execute_drop_table` walks `list_indices` and drops every
4833    /// entry — confirm the auto-created index participates.
4834    #[test]
4835    fn auto_index_id_dropped_with_collection() {
4836        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4837        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4838            .unwrap();
4839        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4840            .unwrap();
4841        assert!(rt
4842            .index_store_ref()
4843            .find_index_for_column("ephemeral", "id")
4844            .is_some());
4845
4846        rt.execute_query("DROP TABLE ephemeral").unwrap();
4847
4848        assert!(
4849            rt.index_store_ref()
4850                .find_index_for_column("ephemeral", "id")
4851                .is_none(),
4852            "implicit `idx_id` must be reaped when its collection drops"
4853        );
4854    }
4855
4856    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4857    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4858    /// off, first insert leaves the collection without an `id` index —
4859    /// DELETE/UPDATE fall back to the scan path.
4860    #[test]
4861    fn auto_index_id_disabled_by_config() {
4862        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4863        let rt = RedDBRuntime::with_options(opts).unwrap();
4864
4865        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4866            .unwrap();
4867        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4868            .unwrap();
4869
4870        assert!(
4871            rt.index_store_ref()
4872                .find_index_for_column("off", "id")
4873                .is_none(),
4874            "with auto_index_id=false, no implicit index should be created"
4875        );
4876    }
4877
4878    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4879
4880    #[test]
4881    fn update_single_row_emits_update_event() {
4882        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4883        rt.execute_query(
4884            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4885        )
4886        .unwrap();
4887        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4888            .unwrap();
4889
4890        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4891            .unwrap();
4892
4893        let events = queue_payloads(&rt, "audit_log");
4894        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4895        let event = events[0].as_object().expect("event payload object");
4896        assert_eq!(
4897            event.get("op").and_then(crate::json::Value::as_str),
4898            Some("update")
4899        );
4900        assert_eq!(
4901            event.get("collection").and_then(crate::json::Value::as_str),
4902            Some("users")
4903        );
4904        assert!(event
4905            .get("event_id")
4906            .and_then(crate::json::Value::as_str)
4907            .is_some_and(|v| !v.is_empty()));
4908        let before = event
4909            .get("before")
4910            .and_then(crate::json::Value::as_object)
4911            .expect("before must be an object");
4912        let after = event
4913            .get("after")
4914            .and_then(crate::json::Value::as_object)
4915            .expect("after must be an object");
4916        assert_eq!(
4917            before.get("name").and_then(crate::json::Value::as_str),
4918            Some("Alice"),
4919            "before.name should be the old value"
4920        );
4921        assert_eq!(
4922            after.get("name").and_then(crate::json::Value::as_str),
4923            Some("Bob"),
4924            "after.name should be the new value"
4925        );
4926    }
4927
4928    #[test]
4929    fn update_event_only_includes_changed_fields() {
4930        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4931        rt.execute_query(
4932            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4933        )
4934        .unwrap();
4935        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4936            .unwrap();
4937
4938        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4939            .unwrap();
4940
4941        let events = queue_payloads(&rt, "evts");
4942        assert_eq!(events.len(), 1);
4943        let event = events[0].as_object().unwrap();
4944        let before = event
4945            .get("before")
4946            .and_then(crate::json::Value::as_object)
4947            .unwrap();
4948        let after = event
4949            .get("after")
4950            .and_then(crate::json::Value::as_object)
4951            .unwrap();
4952        // Only changed field included.
4953        assert!(
4954            before.contains_key("name"),
4955            "before must include changed field"
4956        );
4957        assert!(
4958            after.contains_key("name"),
4959            "after must include changed field"
4960        );
4961        // Unchanged fields must not appear.
4962        assert!(
4963            !before.contains_key("email"),
4964            "before must not include unchanged email"
4965        );
4966        assert!(
4967            !after.contains_key("email"),
4968            "after must not include unchanged email"
4969        );
4970    }
4971
4972    #[test]
4973    fn multi_row_update_emits_one_event_per_row() {
4974        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4975        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4976            .unwrap();
4977        rt.execute_query(
4978            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4979        )
4980        .unwrap();
4981
4982        rt.execute_query("UPDATE items SET status = 'done'")
4983            .unwrap();
4984
4985        let events = queue_payloads(&rt, "evts");
4986        assert_eq!(events.len(), 3, "expected one update event per row");
4987        for event in &events {
4988            let obj = event.as_object().unwrap();
4989            assert_eq!(
4990                obj.get("op").and_then(crate::json::Value::as_str),
4991                Some("update")
4992            );
4993        }
4994    }
4995
4996    #[test]
4997    fn delete_single_row_emits_delete_event() {
4998        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4999        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
5000            .unwrap();
5001        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
5002            .unwrap();
5003
5004        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
5005
5006        let events = queue_payloads(&rt, "del_log");
5007        assert_eq!(events.len(), 1);
5008        let event = events[0].as_object().expect("event payload object");
5009        assert_eq!(
5010            event.get("op").and_then(crate::json::Value::as_str),
5011            Some("delete")
5012        );
5013        assert_eq!(
5014            event.get("collection").and_then(crate::json::Value::as_str),
5015            Some("users")
5016        );
5017        assert!(event
5018            .get("event_id")
5019            .and_then(crate::json::Value::as_str)
5020            .is_some_and(|v| !v.is_empty()));
5021        let before = event
5022            .get("before")
5023            .and_then(crate::json::Value::as_object)
5024            .expect("before must be an object for delete");
5025        assert_eq!(
5026            before.get("id").and_then(crate::json::Value::as_u64),
5027            Some(42)
5028        );
5029        assert_eq!(
5030            before.get("name").and_then(crate::json::Value::as_str),
5031            Some("Alice")
5032        );
5033        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5034    }
5035
5036    #[test]
5037    fn multi_row_delete_emits_one_event_per_row() {
5038        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5039        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5040            .unwrap();
5041        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5042            .unwrap();
5043
5044        rt.execute_query("DELETE FROM items").unwrap();
5045
5046        let events = queue_payloads(&rt, "del_log");
5047        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5048        for event in &events {
5049            let obj = event.as_object().unwrap();
5050            assert_eq!(
5051                obj.get("op").and_then(crate::json::Value::as_str),
5052                Some("delete")
5053            );
5054            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5055        }
5056    }
5057
5058    #[test]
5059    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5060        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5061        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5062            .unwrap();
5063
5064        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5065            .unwrap();
5066        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5067
5068        let events = queue_payloads(&rt, "evts");
5069        assert!(
5070            events.is_empty(),
5071            "UPDATE-only filter must not emit INSERT or DELETE events"
5072        );
5073    }
5074
5075    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
5076
5077    #[test]
5078    fn suppress_events_on_insert_emits_no_events() {
5079        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5080        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5081            .unwrap();
5082
5083        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5084            .unwrap();
5085
5086        let events = queue_payloads(&rt, "evts");
5087        assert!(
5088            events.is_empty(),
5089            "SUPPRESS EVENTS must prevent INSERT events"
5090        );
5091    }
5092
5093    #[test]
5094    fn suppress_events_on_update_emits_no_events() {
5095        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5096        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5097            .unwrap();
5098        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5099            .unwrap();
5100        // drain the INSERT event
5101        let _ = queue_payloads(&rt, "evts");
5102        // Force pop to drain; simpler: just check new count after UPDATE
5103        rt.execute_query("QUEUE PURGE evts").unwrap();
5104
5105        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5106            .unwrap();
5107
5108        let events = queue_payloads(&rt, "evts");
5109        assert!(
5110            events.is_empty(),
5111            "SUPPRESS EVENTS must prevent UPDATE events"
5112        );
5113    }
5114
5115    #[test]
5116    fn suppress_events_on_delete_emits_no_events() {
5117        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5118        rt.execute_query(
5119            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5120        )
5121        .unwrap();
5122        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5123            .unwrap();
5124
5125        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5126            .unwrap();
5127
5128        let events = queue_payloads(&rt, "evts");
5129        assert!(
5130            events.is_empty(),
5131            "SUPPRESS EVENTS must prevent DELETE events"
5132        );
5133    }
5134
5135    #[test]
5136    fn normal_insert_after_suppress_still_emits() {
5137        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5138        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5139            .unwrap();
5140
5141        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5142            .unwrap();
5143        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5144            .unwrap();
5145
5146        let events = queue_payloads(&rt, "evts");
5147        assert_eq!(
5148            events.len(),
5149            1,
5150            "only the non-suppressed INSERT should emit"
5151        );
5152        assert_eq!(
5153            events[0].get("id").and_then(crate::json::Value::as_u64),
5154            Some(2)
5155        );
5156    }
5157}