Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_rid, sys_key_tenant,
28    sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Issue #765 / S6 — lazily hydrate the integrity-tombstone cache from
148    /// `red_config` on first access. Returns `true` when at least one
149    /// tombstone range is present. Subsequent calls observe the cached state
150    /// flag (`1` empty / `2` present) and skip the store scan.
151    fn ensure_integrity_tombstones_loaded(&self) -> bool {
152        use std::sync::atomic::Ordering;
153        match self
154            .inner
155            .integrity_tombstones_state
156            .load(Ordering::Relaxed)
157        {
158            1 => return false,
159            2 => return true,
160            _ => {}
161        }
162        // Cold: load under the cache lock so a concurrent reader cannot
163        // observe a half-populated vector.
164        let mut guard = self.inner.integrity_tombstones.lock();
165        if self
166            .inner
167            .integrity_tombstones_state
168            .load(Ordering::Relaxed)
169            == 0
170        {
171            let ranges = crate::runtime::integrity_tombstone::load_ranges(&self.inner.db.store());
172            let present = !ranges.is_empty();
173            *guard = ranges;
174            self.inner
175                .integrity_tombstones_state
176                .store(if present { 2 } else { 1 }, Ordering::Relaxed);
177        }
178        self.inner
179            .integrity_tombstones_state
180            .load(Ordering::Relaxed)
181            == 2
182    }
183
184    /// Issue #765 / S6 — durably record an integrity tombstone over the
185    /// inclusive RID range `[lo, hi]` of `table` (the committed rows of an
186    /// input stream whose end-to-end SHA-256 digest did not match). The range
187    /// is persisted to `red_config` (survives restart) and folded into the
188    /// in-memory cache so the same process filters it immediately.
189    pub fn record_integrity_tombstone(&self, table: &str, lo: u64, hi: u64) {
190        use std::sync::atomic::Ordering;
191        self.ensure_integrity_tombstones_loaded();
192        let mut guard = self.inner.integrity_tombstones.lock();
193        guard.push(crate::runtime::integrity_tombstone::TombstoneRange::new(
194            table.to_string(),
195            lo,
196            hi,
197        ));
198        crate::runtime::integrity_tombstone::persist_ranges(&self.inner.db.store(), &guard);
199        self.inner
200            .integrity_tombstones_state
201            .store(2, Ordering::Relaxed);
202    }
203
204    /// Issue #765 / S6 — snapshot of the currently-cached tombstone ranges.
205    /// Intended for tests and forensic surfaces; the read path uses
206    /// [`Self::filter_integrity_tombstoned`] which avoids the clone.
207    pub fn integrity_tombstone_ranges(
208        &self,
209    ) -> Vec<crate::runtime::integrity_tombstone::TombstoneRange> {
210        self.ensure_integrity_tombstones_loaded();
211        self.inner.integrity_tombstones.lock().clone()
212    }
213
214    /// Issue #765 / S6 — drop tombstoned rows from a SELECT result in place.
215    /// Fast no-op (one relaxed atomic load) when no tombstone has ever been
216    /// recorded. Clears `pre_serialized_json` when any row is removed so the
217    /// fast-path JSON cannot leak a filtered row back onto the wire.
218    pub fn filter_integrity_tombstoned(&self, result: &mut UnifiedResult) {
219        if !self.ensure_integrity_tombstones_loaded() {
220            return;
221        }
222        let guard = self.inner.integrity_tombstones.lock();
223        if guard.is_empty() {
224            return;
225        }
226        let before = result.records.len();
227        result.records.retain(|record| {
228            !crate::runtime::integrity_tombstone::record_tombstoned(&guard, record)
229        });
230        if result.records.len() != before {
231            result.pre_serialized_json = None;
232        }
233    }
234
235    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
236    /// target table is tenant-scoped and the user's column list does
237    /// not already name the tenant column.
238    ///
239    /// Returns:
240    /// * `Ok(None)` — no injection needed (non-tenant table, or user
241    ///   supplied the column explicitly). Caller uses the original
242    ///   query unchanged.
243    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
244    ///   + literal value appended to every row.
245    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
246    ///   the current session. Fails loudly so callers don't produce
247    ///   rows that RLS would then hide on read.
248    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
249        let Some(tenant_col) = self.tenant_column(&query.table) else {
250            return Ok(None);
251        };
252        // User already named the column (literal match) — trust them.
253        if query
254            .columns
255            .iter()
256            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
257        {
258            return Ok(None);
259        }
260
261        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
262        // nested key like `headers.tenant` we operate on the root
263        // column (`headers`) and set / add the nested path inside its
264        // JSON value. If the user named the root column we mutate in
265        // place; otherwise we create a fresh JSON column for every row.
266        if let Some(dot_pos) = tenant_col.find('.') {
267            let (root, tail) = tenant_col.split_at(dot_pos);
268            let tail = &tail[1..]; // drop leading '.'
269            return self.inject_dotted_tenant(query, root, tail);
270        }
271
272        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
273            return Err(RedDBError::Query(format!(
274                "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                 run SET TENANT '<id>' first or name column '{}' explicitly",
276                query.table, tenant_col
277            )));
278        };
279
280        let mut augmented = query.clone();
281        augmented.columns.push(tenant_col);
282        let lit = Value::text(tenant_id.clone());
283        for row in augmented.values.iter_mut() {
284            row.push(lit.clone());
285        }
286        for row in augmented.value_exprs.iter_mut() {
287            row.push(crate::storage::query::ast::Expr::Literal {
288                value: lit.clone(),
289                span: crate::storage::query::ast::Span::synthetic(),
290            });
291        }
292        Ok(Some(augmented))
293    }
294
295    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
296    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
297    /// nested JSON instead of appending a flat column.
298    ///
299    /// Cases:
300    /// * Root column already in the INSERT list → mutate per-row JSON
301    ///   (parse, set path, re-serialize).
302    /// * Root column absent → create a fresh `{tail: tenant}` JSON
303    ///   object and append the root column to the INSERT.
304    fn inject_dotted_tenant(
305        &self,
306        query: &InsertQuery,
307        root: &str,
308        tail: &str,
309    ) -> RedDBResult<Option<InsertQuery>> {
310        let active_tenant = crate::runtime::impl_core::current_tenant();
311        let mut augmented = query.clone();
312        let root_idx = augmented
313            .columns
314            .iter()
315            .position(|c| c.eq_ignore_ascii_case(root));
316
317        if let Some(idx) = root_idx {
318            // User supplied the root column. Per-row: if the dotted
319            // tail is already present we trust the user (admin / bulk
320            // loader scenario); otherwise fill from the active
321            // tenant. An unbound tenant is only an error when some
322            // row actually needs filling.
323            for row in augmented.values.iter_mut() {
324                let Some(slot) = row.get_mut(idx) else {
325                    continue;
326                };
327                if dotted_tail_already_set(slot, tail) {
328                    continue;
329                }
330                let Some(tenant_id) = &active_tenant else {
331                    return Err(RedDBError::Query(format!(
332                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
333                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
334                        query.table, root, tail
335                    )));
336                };
337                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
338            }
339            // Expression row is kept in sync by re-wrapping the
340            // mutated literal; the canonical path will re-evaluate
341            // against the same JSON shape.
342            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
343                if let Some(slot) = row.get_mut(idx) {
344                    let new_value = augmented
345                        .values
346                        .get(row_idx)
347                        .and_then(|v| v.get(idx))
348                        .cloned()
349                        .unwrap_or(Value::Null);
350                    *slot = crate::storage::query::ast::Expr::Literal {
351                        value: new_value,
352                        span: crate::storage::query::ast::Span::synthetic(),
353                    };
354                }
355            }
356        } else {
357            // No root column in the INSERT list — auto-fill needs a
358            // bound tenant to synthesise one. Error loud so we never
359            // create a tenant-less row that RLS would then hide.
360            let Some(tenant_id) = &active_tenant else {
361                return Err(RedDBError::Query(format!(
362                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
363                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
364                    query.table, root, tail
365                )));
366            };
367            // Create a fresh JSON column with only the tenant path set.
368            augmented.columns.push(root.to_string());
369            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
370            for row in augmented.values.iter_mut() {
371                row.push(fresh.clone());
372            }
373            for row in augmented.value_exprs.iter_mut() {
374                row.push(crate::storage::query::ast::Expr::Literal {
375                    value: fresh.clone(),
376                    span: crate::storage::query::ast::Span::synthetic(),
377                });
378            }
379        }
380
381        Ok(Some(augmented))
382    }
383
384    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
385    /// `lsns` is empty because events fire at commit time.
386    fn delete_entities_batch(
387        &self,
388        collection: &str,
389        ids: &[EntityId],
390    ) -> RedDBResult<(u64, Vec<u64>)> {
391        if ids.is_empty() {
392            return Ok((0, vec![]));
393        }
394
395        let store = self.db().store();
396        let Some(manager) = store.get_collection(collection) else {
397            return Ok((0, vec![]));
398        };
399
400        let active_xid = self.current_xid();
401        let conn_id = crate::runtime::impl_core::current_connection_id();
402        let mut autocommit_xid = None;
403        let mut tombstoned_ids = Vec::new();
404        let mut tombstoned_entities = Vec::new();
405        let mut physical_delete_ids = Vec::new();
406        let table_row_resolver =
407            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
408        // Phase 3: versioned graph collections tombstone node/edge
409        // deletes (xmax stamp) instead of physically removing them, so
410        // `AS OF` time-travel can still resolve the pre-delete version.
411        // Non-versioned graph collections keep the legacy physical
412        // delete (no history). Row entities (Phase 1/2) always tombstone
413        // under universal MVCC and are unaffected by this flag.
414        //
415        // Vectors (Phase 3b): versioned vector deletes tombstone (xmax
416        // stamp) instead of physically dropping, so history is retained.
417        // The `VECTOR SEARCH` read path post-filters every candidate
418        // through the current-snapshot visibility gate (which hides
419        // `xmax != 0` even in autocommit), so a tombstoned versioned
420        // vector never reaches live search. Non-versioned vectors keep
421        // the legacy physical delete.
422        let versioned_collection = self.vcs_is_versioned(collection).unwrap_or(false);
423
424        for &id in ids {
425            let Some(mut entity) = manager.get(id) else {
426                continue;
427            };
428            let is_versioned_graph = versioned_collection
429                && matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_));
430            let is_versioned_vector =
431                versioned_collection && matches!(entity.data, EntityData::Vector(_));
432            if matches!(entity.data, EntityData::Row(_))
433                || is_versioned_graph
434                || is_versioned_vector
435            {
436                let previous_xmax = entity.xmax;
437                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
438                    if table_row_resolver.resolve_candidate(&entity).is_none() {
439                        continue;
440                    }
441                } else if is_versioned_vector {
442                    // Versioned vectors gate the delete on the statement
443                    // snapshot (not a crude `xmax != 0`) so a concurrent
444                    // delete whose snapshot predates a still-uncommitted
445                    // tombstone still targets the row, records its own
446                    // pending tombstone, and conflicts at commit
447                    // (first-committer-wins).
448                    if table_row_resolver.resolve_read_candidate(&entity).is_none() {
449                        continue;
450                    }
451                } else if entity.xmax != 0 {
452                    continue;
453                }
454
455                let xid = match active_xid {
456                    Some(xid) => xid,
457                    None => match autocommit_xid {
458                        Some(xid) => xid,
459                        None => {
460                            let mgr = self.snapshot_manager();
461                            let xid = mgr.begin();
462                            autocommit_xid = Some(xid);
463                            xid
464                        }
465                    },
466                };
467                entity.set_xmax(xid);
468                if manager.update(entity.clone()).is_ok() {
469                    if active_xid.is_some() {
470                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
471                    }
472                    tombstoned_entities.push(entity);
473                    tombstoned_ids.push(id);
474                }
475            } else {
476                physical_delete_ids.push(id);
477            }
478        }
479
480        if let Some(xid) = autocommit_xid {
481            self.snapshot_manager().commit(xid);
482        }
483
484        let mut affected = tombstoned_ids.len() as u64;
485        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
486        if active_xid.is_some() {
487            store
488                .persist_entities_to_pager(collection, &tombstoned_entities)
489                .map_err(|err| RedDBError::Internal(err.to_string()))?;
490        } else {
491            store
492                .persist_entities_to_pager(collection, &tombstoned_entities)
493                .map_err(|err| RedDBError::Internal(err.to_string()))?;
494            for id in &tombstoned_ids {
495                store.context_index().remove_entity(*id);
496                let lsn = self.cdc_emit(
497                    crate::replication::cdc::ChangeOperation::Delete,
498                    collection,
499                    id.raw(),
500                    "entity",
501                );
502                lsns.push(lsn);
503            }
504        }
505
506        let deleted_ids = store
507            .delete_batch(collection, &physical_delete_ids)
508            .map_err(|err| RedDBError::Internal(err.to_string()))?;
509        affected += deleted_ids.len() as u64;
510        for id in &deleted_ids {
511            store.context_index().remove_entity(*id);
512            let lsn = self.cdc_emit(
513                crate::replication::cdc::ChangeOperation::Delete,
514                collection,
515                id.raw(),
516                "entity",
517            );
518            lsns.push(lsn);
519        }
520
521        Ok((affected, lsns))
522    }
523
524    /// Flushes context-index updates and CDC for each applied mutation.
525    /// Returns one LSN per entity in the same order as `applied`.
526    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
527        if applied.is_empty() {
528            return Ok(Vec::new());
529        }
530
531        let store = self.db().store();
532        if applied.iter().any(|item| item.context_index_dirty) {
533            store.context_index().index_entities(
534                &applied[0].collection,
535                applied
536                    .iter()
537                    .filter(|item| item.context_index_dirty)
538                    .map(|item| &item.entity),
539            );
540        }
541
542        for item in applied {
543            self.refresh_update_secondary_indexes(item)?;
544        }
545
546        let mut lsns = Vec::with_capacity(applied.len());
547        for item in applied {
548            let lsn = self.cdc_emit_prebuilt(
549                crate::replication::cdc::ChangeOperation::Update,
550                &item.collection,
551                &item.entity,
552                update_cdc_item_kind(self, &item.collection, &item.entity),
553                item.metadata.as_ref(),
554                false,
555            );
556            lsns.push(lsn);
557        }
558        Ok(lsns)
559    }
560
561    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
562        self.persist_applied_entity_mutations(applied)
563    }
564
565    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
566        if applied.pre_mutation_fields.is_empty() {
567            return Ok(());
568        }
569        let post = entity_row_fields_snapshot(&applied.entity);
570        if post.is_empty() {
571            return Ok(());
572        }
573
574        let indexed_cols = self
575            .index_store_ref()
576            .indexed_columns_set(&applied.collection);
577        if indexed_cols.is_empty() {
578            return Ok(());
579        }
580
581        if let Some(old_version) = applied.replaced_entity.as_ref() {
582            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
583                .pre_mutation_fields
584                .iter()
585                .filter(|(col, _)| indexed_cols.contains(col))
586                .cloned()
587                .collect();
588            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
589                .iter()
590                .filter(|(col, _)| indexed_cols.contains(col))
591                .cloned()
592                .collect();
593            if !old_index_fields.is_empty() {
594                self.index_store_ref()
595                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
596                    .map_err(crate::RedDBError::Internal)?;
597            }
598            if !new_index_fields.is_empty() {
599                self.index_store_ref()
600                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
601                    .map_err(crate::RedDBError::Internal)?;
602            }
603            return Ok(());
604        }
605
606        let damage =
607            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
608        if damage
609            .touched_columns()
610            .into_iter()
611            .any(|col| indexed_cols.contains(col))
612        {
613            self.index_store_ref()
614                .index_entity_update(
615                    &applied.collection,
616                    applied.id,
617                    &applied.pre_mutation_fields,
618                    &post,
619                )
620                .map_err(crate::RedDBError::Internal)?;
621        }
622        Ok(())
623    }
624
625    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
626    ///
627    /// Each row in `query.values` is zipped with `query.columns` to produce a
628    /// set of named fields, which is then dispatched based on entity_type.
629    pub fn execute_insert(
630        &self,
631        raw_query: &str,
632        query: &InsertQuery,
633    ) -> RedDBResult<RuntimeQueryResult> {
634        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
635        // CollectionContract gate (#49): single entry point for the
636        // operator's collection-level write rules. Today this is a
637        // no-op for INSERT (APPEND ONLY permits insert); routing
638        // through the gate now means future contract bits — versioned,
639        // vault-only writes — plug in once instead of per verb.
640        crate::runtime::collection_contract::CollectionContractGate::check(
641            self,
642            &query.table,
643            crate::runtime::collection_contract::MutationKind::Insert,
644        )?;
645        // Phase 2.5.4 table-scoped tenancy: if the target table is
646        // tenant-scoped and the user didn't name the tenant column,
647        // auto-inject it with the thread-local `CURRENT_TENANT()`
648        // value. When the column is named explicitly we trust the
649        // caller (useful for admin tooling that writes on behalf of
650        // specific tenants). An unbound tenant on an implicit-fill
651        // path errors up front rather than producing a row the RLS
652        // policy would silently hide.
653        let augmented_owned;
654        let query = match self.maybe_inject_tenant_column(query)? {
655            Some(new_q) => {
656                augmented_owned = new_q;
657                &augmented_owned
658            }
659            None => query,
660        };
661        self.check_insert_column_policy(query)?;
662        if let Some(ref embed_config) = query.auto_embed {
663            let provider = crate::ai::parse_provider(&embed_config.provider)?;
664            // S3 / #711: planner-level provider gate. Runs before the
665            // local-model preflight and the API-key resolver so neither
666            // side-effect fires when policy denies.
667            crate::runtime::ai::provider_gate::enforce(self, &provider)?;
668            if matches!(provider, crate::ai::AiProvider::Local) {
669                crate::runtime::ai::local_embedding::ensure_local_embedding_available()?;
670                // Issue #682 — pre-flight the local model registry before
671                // any row write. Missing model, uninstalled artifacts,
672                // wrong task, and disabled-feature failures surface as
673                // deterministic errors that leave the target collection
674                // untouched, satisfying the "no partial writes on
675                // embedding failure" criterion for the failure modes
676                // owned by the local provider.
677                let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
678                if model_name.is_empty() {
679                    return Err(RedDBError::Query(
680                        "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
681                         the local provider does not have an implicit default model"
682                            .to_string(),
683                    ));
684                }
685                crate::runtime::ai::local_embedding::preflight_local_embedding(
686                    &self.inner.db,
687                    model_name,
688                )?;
689            }
690        }
691
692        let mut inserted_count: u64 = 0;
693        let effective_rows =
694            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
695
696        // Ensure the collection exists (auto-create on first insert).
697        let store = self.inner.db.store();
698        let _ = store.get_or_create_collection(&query.table);
699        let declared_model = self
700            .db()
701            .collection_contract_arc(&query.table)
702            .map(|contract| contract.declared_model);
703
704        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
705            if query.returning.is_some() {
706                Some(Vec::with_capacity(effective_rows.len()))
707            } else {
708                None
709            };
710        let mut returning_result: Option<UnifiedResult> = None;
711
712        if matches!(query.entity_type, InsertEntityType::Row)
713            && !matches!(
714                declared_model,
715                Some(crate::catalog::CollectionModel::TimeSeries)
716            )
717        {
718            // Issue #523 + #524: blockchain collections seal each row into the
719            // chain. When the caller omits the reserved columns, the engine
720            // auto-fills (#523). When the caller supplies any reserved column,
721            // the values are validated against the current tip and a mismatch
722            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
723            //
724            // The whole batch runs under a per-collection chain lock so two
725            // concurrent submitters can't both bind to the same prev_hash —
726            // the loser observes the advanced tip and gets 409 with the new
727            // tip so it can retry.
728            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
729            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
730                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
731            } else {
732                None
733            };
734            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
735
736            // Issue #525 — refuse new blocks if the chain has been marked
737            // `integrity = broken` until an admin clears the flag.
738            if chain_mode && self.is_chain_integrity_broken(&query.table) {
739                return Err(RedDBError::InvalidOperation(format!(
740                    "ChainIntegrityBroken: collection '{}' is locked until \
741                     POST /collections/{}/clear-integrity-flag is called by an admin",
742                    query.table, query.table
743                )));
744            }
745
746            // Pull the tip from the in-memory cache; fall back to a one-time
747            // scan if the cache hasn't seen this collection yet (cold start
748            // after restart). Cache is updated below as rows are sealed.
749            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
750                if chain_mode {
751                    let mut cache = self.inner.chain_tip_cache.lock();
752                    if let Some(existing) = cache.get(&query.table) {
753                        Some(existing.clone())
754                    } else if let Some(scanned) =
755                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
756                    {
757                        cache.insert(query.table.clone(), scanned.clone());
758                        Some(scanned)
759                    } else {
760                        None
761                    }
762                } else {
763                    None
764                };
765
766            let mut rows = Vec::with_capacity(effective_rows.len());
767            for row_values in &effective_rows {
768                if row_values.len() != query.columns.len() {
769                    return Err(RedDBError::Query(format!(
770                        "INSERT column count ({}) does not match value count ({})",
771                        query.columns.len(),
772                        row_values.len()
773                    )));
774                }
775                let (mut fields, mut metadata) =
776                    split_insert_metadata(self, &query.columns, row_values)?;
777                if chain_mode {
778                    use crate::runtime::blockchain_kind::{
779                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
780                        COL_TIMESTAMP, RESERVED_COLUMNS,
781                    };
782                    let supplied_height = fields
783                        .iter()
784                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
785                        .map(|(_, v)| v.clone());
786                    let supplied_prev = fields
787                        .iter()
788                        .find(|(k, _)| k == COL_PREV_HASH)
789                        .map(|(_, v)| v.clone());
790                    let supplied_ts = fields
791                        .iter()
792                        .find(|(k, _)| k == COL_TIMESTAMP)
793                        .map(|(_, v)| v.clone());
794                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
795                    let user_supplied_any = supplied_height.is_some()
796                        || supplied_prev.is_some()
797                        || supplied_ts.is_some()
798                        || supplied_hash;
799
800                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
801                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
802
803                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
804                        Some(t) => (t.hash, t.height + 1),
805                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
806                    };
807                    let server_now = crate::runtime::blockchain_kind::now_ms();
808
809                    let (use_prev, use_height, use_ts) = if user_supplied_any {
810                        // Caller is participating in the chain protocol —
811                        // every field must be supplied AND match the tip.
812                        if supplied_hash {
813                            return Err(chain_conflict_error(
814                                tip_next_height.saturating_sub(1),
815                                tip_prev_hash,
816                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
817                                server_now,
818                                "hash column is engine-computed and cannot be supplied",
819                            ));
820                        }
821                        let caller_prev = match &supplied_prev {
822                            Some(Value::Blob(b)) if b.len() == 32 => {
823                                let mut a = [0u8; 32];
824                                a.copy_from_slice(b);
825                                a
826                            }
827                            Some(Value::Text(s)) if s.len() == 64 => {
828                                // Accept hex-encoded prev_hash so JSON / SQL
829                                // callers without literal-blob syntax can
830                                // still participate in the chain protocol.
831                                let mut a = [0u8; 32];
832                                let mut ok = true;
833                                for (i, slot) in a.iter_mut().enumerate() {
834                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
835                                    match u8::from_str_radix(pair, 16) {
836                                        Ok(byte) => *slot = byte,
837                                        Err(_) => {
838                                            ok = false;
839                                            break;
840                                        }
841                                    }
842                                }
843                                if !ok {
844                                    return Err(chain_conflict_error(
845                                        tip_next_height.saturating_sub(1),
846                                        tip_prev_hash,
847                                        chain_tip_full
848                                            .as_ref()
849                                            .map(|t| t.timestamp_ms)
850                                            .unwrap_or(0),
851                                        server_now,
852                                        "prev_hash is not valid hex",
853                                    ));
854                                }
855                                a
856                            }
857                            _ => {
858                                return Err(chain_conflict_error(
859                                    tip_next_height.saturating_sub(1),
860                                    tip_prev_hash,
861                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
862                                    server_now,
863                                    "prev_hash missing or not a 32-byte Blob",
864                                ));
865                            }
866                        };
867                        if caller_prev != tip_prev_hash {
868                            return Err(chain_conflict_error(
869                                tip_next_height.saturating_sub(1),
870                                tip_prev_hash,
871                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
872                                server_now,
873                                "prev_hash does not match current tip",
874                            ));
875                        }
876                        let caller_height = match &supplied_height {
877                            Some(Value::UnsignedInteger(v)) => *v,
878                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
879                            _ => {
880                                return Err(chain_conflict_error(
881                                    tip_next_height.saturating_sub(1),
882                                    tip_prev_hash,
883                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
884                                    server_now,
885                                    "block_height missing or not an unsigned integer",
886                                ));
887                            }
888                        };
889                        if caller_height != tip_next_height {
890                            return Err(chain_conflict_error(
891                                tip_next_height.saturating_sub(1),
892                                tip_prev_hash,
893                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
894                                server_now,
895                                "block_height does not match tip+1",
896                            ));
897                        }
898                        let caller_ts = match &supplied_ts {
899                            Some(Value::UnsignedInteger(v)) => *v,
900                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
901                            _ => {
902                                return Err(chain_conflict_error(
903                                    tip_next_height.saturating_sub(1),
904                                    tip_prev_hash,
905                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
906                                    server_now,
907                                    "timestamp missing or not an unsigned integer",
908                                ));
909                            }
910                        };
911                        let drift = (caller_ts as i128) - (server_now as i128);
912                        if drift.abs() > 60_000 {
913                            return Err(chain_conflict_error(
914                                tip_next_height.saturating_sub(1),
915                                tip_prev_hash,
916                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
917                                server_now,
918                                "timestamp outside ±60s of server_time",
919                            ));
920                        }
921                        (caller_prev, caller_height, caller_ts)
922                    } else {
923                        (tip_prev_hash, tip_next_height, server_now)
924                    };
925
926                    let (reserved, new_hash) =
927                        crate::runtime::blockchain_kind::make_block_reserved_fields(
928                            use_prev, use_height, use_ts, &payload,
929                        );
930                    fields.extend(reserved);
931                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
932                        height: use_height,
933                        hash: new_hash,
934                        timestamp_ms: use_ts,
935                    });
936                }
937                // Issue #522 — signed-writes verification. On collections
938                // created with `SIGNED_BY (...)` the row must carry valid
939                // `signer_pubkey` + `signature` reserved columns. Runs
940                // after chain_mode so canonical payload covers user-supplied
941                // fields only (blockchain reserved columns are filtered by
942                // `canonical_payload`; the two signed-writes reserved
943                // columns are split out before payload computation, then
944                // re-attached for storage). The blockchain + SIGNED_BY
945                // composition is owned by issue #526; we keep #522 to the
946                // non-chain path and let chain_mode collections punt to that
947                // slice rather than half-wire it here.
948                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
949                    let (pk_col, sig_col, residual) =
950                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
951                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
952                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
953                    crate::runtime::signed_writes_kind::verify_row(
954                        &reg,
955                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
956                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
957                        &payload,
958                    )
959                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
960                    fields = residual;
961                    // Round-trip the reserved columns with the value
962                    // type the caller supplied (Text/hex on the SQL path,
963                    // Blob on the binary path). Keeps SELECT and WHERE
964                    // predicates symmetric with the INSERT shape.
965                    if let Some(col) = pk_col {
966                        fields.push((
967                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
968                            col.raw_value,
969                        ));
970                    }
971                    if let Some(col) = sig_col {
972                        fields.push((
973                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
974                            col.raw_value,
975                        ));
976                    }
977                }
978                merge_with_clauses(
979                    &mut metadata,
980                    query.ttl_ms,
981                    query.expires_at_ms,
982                    &query.with_metadata,
983                );
984                if let Some(snaps) = returning_snapshots.as_mut() {
985                    snaps.push(fields.clone());
986                }
987                rows.push(CreateRowInput {
988                    collection: query.table.clone(),
989                    fields,
990                    metadata,
991                    node_links: Vec::new(),
992                    vector_links: Vec::new(),
993                });
994            }
995            let outputs = self.create_rows_batch(CreateRowsBatchInput {
996                collection: query.table.clone(),
997                rows,
998                suppress_events: query.suppress_events,
999            })?;
1000            inserted_count = outputs.len() as u64;
1001
1002            // Chain mode: commit the new tip to the in-memory cache only after
1003            // the batch persisted successfully. If the batch threw mid-way the
1004            // cache stays on the previous tip and the chain lock releases.
1005            if chain_mode {
1006                if let Some(new_tip) = chain_tip_full.as_ref() {
1007                    self.inner
1008                        .chain_tip_cache
1009                        .lock()
1010                        .insert(query.table.clone(), new_tip.clone());
1011                }
1012            }
1013
1014            // Hypertable chunk routing: if this table was declared via
1015            // CREATE HYPERTABLE, register each row's time-column value
1016            // with the registry so chunk metadata (bounds, row counts,
1017            // TTL eligibility) stays current. This is what lets
1018            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
1019            // retention daemon sweep expired chunks without scanning
1020            // every row.
1021            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
1022                let time_col = &spec.time_column;
1023                // Find the column's index in the INSERT column list.
1024                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
1025                    for row in &effective_rows {
1026                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
1027                            if *n >= 0 {
1028                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
1029                            }
1030                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
1031                            let _ = self.inner.db.hypertables().route(&query.table, *n);
1032                        }
1033                    }
1034                }
1035            }
1036
1037            if let (Some(items), Some(snaps)) =
1038                (query.returning.as_ref(), returning_snapshots.take())
1039            {
1040                let snaps = row_insert_returning_snapshots(&outputs, snaps);
1041                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
1042            }
1043        } else {
1044            // Issue #419: surface the inserted entity id on every INSERT path.
1045            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
1046            // so a RETURNING clause (and the unconditional inserted_ids list,
1047            // below) can expose the engine-assigned id. TimeSeries (the row
1048            // branch in this else) still returns the not-supported error
1049            // because create_timeseries_point isn't plumbed through this fn.
1050            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
1051                Vec::with_capacity(effective_rows.len());
1052            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
1053            {
1054                Vec::with_capacity(effective_rows.len())
1055            } else {
1056                Vec::new()
1057            };
1058            if matches!(
1059                query.entity_type,
1060                InsertEntityType::Node | InsertEntityType::Edge
1061            ) {
1062                enum PreparedGraphInsert {
1063                    Node {
1064                        fields: Vec<(String, Value)>,
1065                        input: CreateNodeInput,
1066                    },
1067                    Edge {
1068                        fields: Vec<(String, Value)>,
1069                        input: CreateEdgeInput,
1070                    },
1071                }
1072
1073                let mut prepared = Vec::with_capacity(effective_rows.len());
1074                for row_values in &effective_rows {
1075                    if row_values.len() != query.columns.len() {
1076                        return Err(RedDBError::Query(format!(
1077                            "INSERT column count ({}) does not match value count ({})",
1078                            query.columns.len(),
1079                            row_values.len()
1080                        )));
1081                    }
1082
1083                    match query.entity_type {
1084                        InsertEntityType::Node => {
1085                            let (node_values, mut metadata) =
1086                                split_insert_metadata(self, &query.columns, row_values)?;
1087                            merge_with_clauses(
1088                                &mut metadata,
1089                                query.ttl_ms,
1090                                query.expires_at_ms,
1091                                &query.with_metadata,
1092                            );
1093                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1094                            apply_collection_default_ttl_metadata(
1095                                self,
1096                                &query.table,
1097                                &mut metadata,
1098                            );
1099                            let (columns, values) = pairwise_columns_values(&node_values);
1100                            let label = find_column_value_string(&columns, &values, "label")?;
1101                            let node_type =
1102                                find_column_value_opt_string(&columns, &values, "node_type");
1103                            let properties = extract_remaining_properties(
1104                                &columns,
1105                                &values,
1106                                &["label", "node_type"],
1107                            );
1108                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1109                                properties.iter().map(|(key, _)| key.as_str()),
1110                                &format!("node '{}'", query.table),
1111                            )?;
1112                            prepared.push(PreparedGraphInsert::Node {
1113                                fields: node_values,
1114                                input: CreateNodeInput {
1115                                    collection: query.table.clone(),
1116                                    label,
1117                                    node_type,
1118                                    properties,
1119                                    metadata,
1120                                    embeddings: Vec::new(),
1121                                    table_links: Vec::new(),
1122                                    node_links: Vec::new(),
1123                                },
1124                            });
1125                        }
1126                        InsertEntityType::Edge => {
1127                            let (edge_values, mut metadata) =
1128                                split_insert_metadata(self, &query.columns, row_values)?;
1129                            merge_with_clauses(
1130                                &mut metadata,
1131                                query.ttl_ms,
1132                                query.expires_at_ms,
1133                                &query.with_metadata,
1134                            );
1135                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1136                            apply_collection_default_ttl_metadata(
1137                                self,
1138                                &query.table,
1139                                &mut metadata,
1140                            );
1141                            let (columns, values) = pairwise_columns_values(&edge_values);
1142                            let label = find_column_value_string(&columns, &values, "label")?;
1143                            ensure_non_tree_structural_edge_label(&label)?;
1144                            let from_id = resolve_edge_endpoint_any(
1145                                self.inner.db.store().as_ref(),
1146                                &query.table,
1147                                &columns,
1148                                &values,
1149                                &["from_rid", "from"],
1150                            )?;
1151                            let to_id = resolve_edge_endpoint_any(
1152                                self.inner.db.store().as_ref(),
1153                                &query.table,
1154                                &columns,
1155                                &values,
1156                                &["to_rid", "to"],
1157                            )?;
1158                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1159                            let properties = extract_remaining_properties(
1160                                &columns,
1161                                &values,
1162                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1163                            );
1164                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1165                                properties.iter().map(|(key, _)| key.as_str()),
1166                                &format!("edge '{}'", query.table),
1167                            )?;
1168                            prepared.push(PreparedGraphInsert::Edge {
1169                                fields: edge_values,
1170                                input: CreateEdgeInput {
1171                                    collection: query.table.clone(),
1172                                    label,
1173                                    from: EntityId::new(from_id),
1174                                    to: EntityId::new(to_id),
1175                                    weight,
1176                                    properties,
1177                                    metadata,
1178                                },
1179                            });
1180                        }
1181                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1182                    }
1183                }
1184
1185                ensure_graph_insert_contract(self, &query.table)?;
1186                let mut batch = self.inner.db.batch();
1187                for item in prepared {
1188                    match item {
1189                        PreparedGraphInsert::Node { fields, input } => {
1190                            if query.returning.is_some() {
1191                                returning_field_snaps.push(fields);
1192                            }
1193                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1194                            batch = batch.add_node_with_type(
1195                                input.collection,
1196                                input.label,
1197                                node_type,
1198                                input.properties.into_iter().collect(),
1199                                input.metadata.into_iter().collect(),
1200                            );
1201                        }
1202                        PreparedGraphInsert::Edge { fields, input } => {
1203                            if query.returning.is_some() {
1204                                returning_field_snaps.push(fields);
1205                            }
1206                            batch = batch.add_edge(
1207                                input.collection,
1208                                input.label,
1209                                input.from,
1210                                input.to,
1211                                input.weight.unwrap_or(1.0),
1212                                input.properties.into_iter().collect(),
1213                                input.metadata.into_iter().collect(),
1214                            );
1215                        }
1216                    }
1217                }
1218                let batch_result = batch
1219                    .execute()
1220                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1221                let (ids, entity_kind) = match query.entity_type {
1222                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1223                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1224                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1225                };
1226                for id in &ids {
1227                    self.stamp_xmin_if_in_txn(&query.table, *id);
1228                }
1229                if query.returning.is_some() {
1230                    returning_field_snaps = graph_insert_returning_snapshots(
1231                        self.inner.db.store().as_ref(),
1232                        &query.table,
1233                        &ids,
1234                    );
1235                }
1236                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1237                let store = self.inner.db.store();
1238                entity_outputs.extend(ids.iter().map(|id| {
1239                    crate::application::entity::CreateEntityOutput {
1240                        id: *id,
1241                        entity: store.get(&query.table, *id),
1242                    }
1243                }));
1244                inserted_count = ids.len() as u64;
1245            } else {
1246                for row_values in &effective_rows {
1247                    if row_values.len() != query.columns.len() {
1248                        return Err(RedDBError::Query(format!(
1249                            "INSERT column count ({}) does not match value count ({})",
1250                            query.columns.len(),
1251                            row_values.len()
1252                        )));
1253                    }
1254
1255                    match query.entity_type {
1256                        InsertEntityType::Row => {
1257                            if query.returning.is_some() {
1258                                return Err(RedDBError::Query(
1259                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1260                                    .to_string(),
1261                            ));
1262                            }
1263                            let (fields, mut metadata) =
1264                                split_insert_metadata(self, &query.columns, row_values)?;
1265                            merge_with_clauses(
1266                                &mut metadata,
1267                                query.ttl_ms,
1268                                query.expires_at_ms,
1269                                &query.with_metadata,
1270                            );
1271                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1272                        }
1273                        InsertEntityType::Node | InsertEntityType::Edge => {
1274                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1275                        }
1276                        InsertEntityType::Vector => {
1277                            let (vector_values, mut metadata) =
1278                                split_insert_metadata(self, &query.columns, row_values)?;
1279                            merge_with_clauses(
1280                                &mut metadata,
1281                                query.ttl_ms,
1282                                query.expires_at_ms,
1283                                &query.with_metadata,
1284                            );
1285                            let (columns, values) = pairwise_columns_values(&vector_values);
1286                            let dense = find_column_value_vec_f32_any(
1287                                &columns,
1288                                &values,
1289                                &["dense", "embedding"],
1290                            )?;
1291                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1292                            let content =
1293                                find_column_value_opt_string(&columns, &values, "content");
1294                            if query.returning.is_some() {
1295                                returning_field_snaps.push(vector_values.clone());
1296                            }
1297                            let input = CreateVectorInput {
1298                                collection: query.table.clone(),
1299                                dense,
1300                                content,
1301                                metadata,
1302                                link_row: None,
1303                                link_node: None,
1304                            };
1305                            entity_outputs.push(self.create_vector(input)?);
1306                        }
1307                        InsertEntityType::Document => {
1308                            let (document_values, mut metadata) =
1309                                split_insert_metadata(self, &query.columns, row_values)?;
1310                            merge_with_clauses(
1311                                &mut metadata,
1312                                query.ttl_ms,
1313                                query.expires_at_ms,
1314                                &query.with_metadata,
1315                            );
1316                            let (columns, values) = pairwise_columns_values(&document_values);
1317                            let body = find_document_body_json(&columns, &values)?;
1318                            let input = CreateDocumentInput {
1319                                collection: query.table.clone(),
1320                                body,
1321                                metadata,
1322                                node_links: Vec::new(),
1323                                vector_links: Vec::new(),
1324                            };
1325                            let output = self.create_document(input)?;
1326                            if query.returning.is_some() {
1327                                let fields = output
1328                                    .entity
1329                                    .as_ref()
1330                                    .map(entity_row_fields_snapshot)
1331                                    .filter(|fields| !fields.is_empty())
1332                                    .unwrap_or(document_values);
1333                                returning_field_snaps.push(fields);
1334                            }
1335                            entity_outputs.push(output);
1336                        }
1337                        InsertEntityType::Kv => {
1338                            let (kv_values, mut metadata) =
1339                                split_insert_metadata(self, &query.columns, row_values)?;
1340                            merge_with_clauses(
1341                                &mut metadata,
1342                                query.ttl_ms,
1343                                query.expires_at_ms,
1344                                &query.with_metadata,
1345                            );
1346                            let (columns, values) = pairwise_columns_values(&kv_values);
1347                            let key = find_column_value_string(&columns, &values, "key")?;
1348                            let value = find_column_value(&columns, &values, "value")?;
1349                            if query.returning.is_some() {
1350                                returning_field_snaps.push(kv_values.clone());
1351                            }
1352                            let input = CreateKvInput {
1353                                collection: query.table.clone(),
1354                                key,
1355                                value,
1356                                metadata,
1357                            };
1358                            entity_outputs.push(self.create_kv(input)?);
1359                        }
1360                    }
1361
1362                    inserted_count += 1;
1363                }
1364            }
1365
1366            if let Some(items) = query.returning.as_ref() {
1367                if !entity_outputs.is_empty() {
1368                    returning_result = Some(build_returning_result(
1369                        items,
1370                        &returning_field_snaps,
1371                        Some(&entity_outputs),
1372                    ));
1373                }
1374            }
1375        }
1376
1377        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1378        if let Some(ref embed_config) = query.auto_embed {
1379            let store = self.inner.db.store();
1380            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1381            let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1382            // Local provider runs in-process — no API key path applies.
1383            // The pre-flight above already required `MODEL '<name>'`
1384            // for the local case, so the unwrap_or default below only
1385            // ever fires for OpenAI-compatible providers.
1386            let api_key = if is_local_provider {
1387                String::new()
1388            } else {
1389                crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1390            };
1391            let model = embed_config.model.clone().unwrap_or_else(|| {
1392                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1393                    .ok()
1394                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1395            });
1396
1397            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1398            let manager = store
1399                .get_collection(&query.table)
1400                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1401            let entities = manager.query_all(|_| true);
1402            let recent: Vec<_> = entities
1403                .into_iter()
1404                .rev()
1405                .take(effective_rows.len())
1406                .collect();
1407
1408            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1409            let entity_combos: Vec<(usize, String)> = recent
1410                .iter()
1411                .enumerate()
1412                .filter_map(|(i, entity)| {
1413                    if let EntityData::Row(ref row) = entity.data {
1414                        if let Some(ref named) = row.named {
1415                            let texts: Vec<String> = embed_config
1416                                .fields
1417                                .iter()
1418                                .filter_map(|field| match named.get(field) {
1419                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1420                                    _ => None,
1421                                })
1422                                .collect();
1423                            if !texts.is_empty() {
1424                                return Some((i, texts.join(" ")));
1425                            }
1426                        }
1427                    }
1428                    None
1429                })
1430                .collect();
1431
1432            if !entity_combos.is_empty() {
1433                // Batch phase: single provider round-trip for all rows.
1434                let batch_texts: Vec<String> =
1435                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1436
1437                // Issue #682 — when the provider is `local`, bypass
1438                // AiBatchClient (which is HTTP-only) and dispatch
1439                // directly through the in-process local embedding
1440                // backend. All texts go in one call, mirroring the
1441                // single-round-trip shape of the remote path. The
1442                // local backend does not perform intra-batch dedup —
1443                // each input position gets its own row in the output
1444                // — which keeps the per-row "create_vector" loop
1445                // below correct without additional fan-out logic.
1446                let embeddings = if is_local_provider {
1447                    let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1448                        &self.inner.db,
1449                        &model,
1450                        batch_texts,
1451                    )?;
1452                    response.embeddings
1453                } else {
1454                    let batch_client =
1455                        crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1456
1457                    match tokio::runtime::Handle::try_current() {
1458                        Ok(handle) => tokio::task::block_in_place(|| {
1459                            handle.block_on(batch_client.embed_batch(
1460                                &provider,
1461                                &model,
1462                                &api_key,
1463                                batch_texts,
1464                            ))
1465                        }),
1466                        Err(_) => {
1467                            return Err(RedDBError::Query(
1468                                "AUTO EMBED requires a Tokio runtime context".to_string(),
1469                            ));
1470                        }
1471                    }
1472                    .map_err(|e| RedDBError::Query(e.to_string()))?
1473                };
1474
1475                // Distribute phase: persist one vector per non-empty embedding.
1476                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1477                    if dense.is_empty() {
1478                        continue;
1479                    }
1480                    self.create_vector(CreateVectorInput {
1481                        collection: query.table.clone(),
1482                        dense,
1483                        content: Some(combined.clone()),
1484                        metadata: Vec::new(),
1485                        link_row: None,
1486                        link_node: None,
1487                    })?;
1488                }
1489            }
1490        }
1491
1492        if inserted_count > 0 {
1493            self.note_table_write(&query.table);
1494        }
1495
1496        let mut result = RuntimeQueryResult::dml_result(
1497            raw_query.to_string(),
1498            inserted_count,
1499            "insert",
1500            "runtime-dml",
1501        );
1502        if let Some(returning) = returning_result {
1503            result.result = returning;
1504        }
1505        Ok(result)
1506    }
1507
1508    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1509        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1510            return Ok(());
1511        };
1512        if !auth_store.iam_authorization_enabled() {
1513            return Ok(());
1514        }
1515        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1516            return Ok(());
1517        };
1518
1519        let tenant = crate::runtime::impl_core::current_tenant();
1520        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1521        let request = crate::auth::ColumnAccessRequest {
1522            action: "insert".to_string(),
1523            schema: None,
1524            table: query.table.clone(),
1525            columns: query.columns.clone(),
1526        };
1527        let ctx = crate::auth::policies::EvalContext {
1528            principal_tenant: tenant.clone(),
1529            current_tenant: tenant,
1530            peer_ip: None,
1531            mfa_present: false,
1532            now_ms: crate::auth::now_ms(),
1533            principal_is_admin_role: role == crate::auth::Role::Admin,
1534            principal_is_platform_scoped: principal.tenant.is_none(),
1535        };
1536
1537        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1538        let table_allowed = matches!(
1539            outcome.table_decision,
1540            crate::auth::policies::Decision::Allow { .. }
1541                | crate::auth::policies::Decision::AdminBypass
1542        );
1543        if !table_allowed {
1544            return Err(RedDBError::Query(format!(
1545                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1546                outcome.table_resource.kind, outcome.table_resource.name
1547            )));
1548        }
1549        if let Some(denied) = outcome.first_denied_column() {
1550            return Err(RedDBError::Query(format!(
1551                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1552                denied.resource.kind, denied.resource.name
1553            )));
1554        }
1555
1556        Ok(())
1557    }
1558
1559    pub(crate) fn insert_timeseries_point(
1560        &self,
1561        collection: &str,
1562        fields: Vec<(String, Value)>,
1563        mut metadata: Vec<(String, MetadataValue)>,
1564    ) -> RedDBResult<EntityId> {
1565        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1566
1567        let (columns, values) = pairwise_columns_values(&fields);
1568        validate_timeseries_insert_columns(&columns)?;
1569
1570        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1571        // an `event_name` whose schema is registered, validate the
1572        // `payload` JSON against it BEFORE any write side-effect. On
1573        // failure we return a typed error and the row is not
1574        // persisted. When no schema is registered for the event name
1575        // (or no `event_name` column is supplied at all) we fall
1576        // through to the normal write path for back-compat with
1577        // existing timeseries rows.
1578        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1579        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1580        if let Some(event_name) = event_name_opt.as_deref() {
1581            let store_for_schema = self.inner.db.store();
1582            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1583                .is_some()
1584            {
1585                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1586                super::analytics_schema_registry::validate(
1587                    store_for_schema.as_ref(),
1588                    event_name,
1589                    payload_json,
1590                )
1591                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1592            }
1593        }
1594
1595        // `metric` is required by the existing timeseries write path;
1596        // when an analytics-style row supplies `event_name` but not
1597        // `metric`, fall back to the event name so the storage path
1598        // still has a non-empty metric tag.
1599        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1600            Some(m) => m,
1601            None => event_name_opt.clone().ok_or_else(|| {
1602                RedDBError::Query(
1603                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1604                )
1605            })?,
1606        };
1607        // `value` is optional for analytics-event rows (which are
1608        // semantically counts of 1); default to 1.0 when missing so
1609        // analytics inserts don't have to fabricate a metric value.
1610        let value = match find_column_value_opt_string(&columns, &values, "value") {
1611            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1612            None => columns
1613                .iter()
1614                .position(|c| c.eq_ignore_ascii_case("value"))
1615                .and_then(|i| match &values[i] {
1616                    Value::Float(f) => Some(*f),
1617                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1618                    Value::UnsignedInteger(n) => Some(*n as f64),
1619                    _ => None,
1620                })
1621                .unwrap_or(1.0),
1622        };
1623        let timestamp_ns =
1624            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1625        let mut tags = find_timeseries_tags(&columns, &values)?;
1626        if let Some(ref name) = event_name_opt {
1627            tags.entry("event_name".to_string())
1628                .or_insert_with(|| name.clone());
1629        }
1630        if let Some(ref payload) = payload_opt {
1631            tags.entry("payload".to_string())
1632                .or_insert_with(|| payload.clone());
1633        }
1634
1635        let mut entity = UnifiedEntity::new(
1636            EntityId::new(0),
1637            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1638                series: collection.to_string(),
1639                metric: metric.clone(),
1640            })),
1641            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1642                metric,
1643                timestamp_ns,
1644                value,
1645                tags,
1646            }),
1647        );
1648        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1649        // or an autocommit xid (allocated and committed up-front so
1650        // future snapshots see the row as soon as it lands).
1651        let writer_xid = match self.current_xid() {
1652            Some(xid) => xid,
1653            None => {
1654                let mgr = self.snapshot_manager();
1655                let xid = mgr.begin();
1656                mgr.commit(xid);
1657                xid
1658            }
1659        };
1660        entity.set_xmin(writer_xid);
1661
1662        let store = self.inner.db.store();
1663        let id = store
1664            .insert_auto(collection, entity)
1665            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1666
1667        if !metadata.is_empty() {
1668            let _ = store.set_metadata(
1669                collection,
1670                id,
1671                Metadata::with_fields(metadata.into_iter().collect()),
1672            );
1673        }
1674
1675        self.cdc_emit(
1676            crate::replication::cdc::ChangeOperation::Insert,
1677            collection,
1678            id.raw(),
1679            "timeseries",
1680        );
1681
1682        Ok(id)
1683    }
1684
1685    /// Execute UPDATE table SET col=val, ... WHERE filter
1686    ///
1687    /// Scans the target collection, evaluates the WHERE filter against each
1688    /// record, and patches every matching entity.
1689    pub fn execute_update(
1690        &self,
1691        raw_query: &str,
1692        query: &UpdateQuery,
1693    ) -> RedDBResult<RuntimeQueryResult> {
1694        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1695        // Issue #523 — blockchain collections are immutable. Reject before
1696        // RLS / RETURNING work so the operator sees a clean 409-mapped
1697        // error instead of a partially-applied mutation surface.
1698        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1699            return Err(RedDBError::InvalidOperation(format!(
1700                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1701                query.table
1702            )));
1703        }
1704        // CollectionContract gate (#50): runs the APPEND ONLY guard
1705        // (and any future contract bits) before RLS / RETURNING work
1706        // so the operator's immutability declaration is honoured
1707        // uniformly and the error message points at the DDL rather
1708        // than at a downstream symptom.
1709        crate::runtime::collection_contract::CollectionContractGate::check(
1710            self,
1711            &query.table,
1712            crate::runtime::collection_contract::MutationKind::Update,
1713        )?;
1714        ensure_update_target_contract(self, &query.table, query.target)?;
1715        ensure_graph_identity_update_target_allowed(query)?;
1716
1717        // Apply RLS augmentation first so every downstream path — plain
1718        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1719        // same policy-filtered target set. This prevents RETURNING
1720        // from ever exposing rows the UPDATE policy would have
1721        // denied.
1722        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1723        let augmented_query: UpdateQuery;
1724        let effective_query: &UpdateQuery = if rls_gated {
1725            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1726                self,
1727                &query.table,
1728                crate::storage::query::ast::PolicyAction::Update,
1729            );
1730            let Some(policy) = rls_filter else {
1731                // No admitting policy: zero rows affected, empty
1732                // RETURNING (never leak rows the caller can't touch).
1733                let mut response = RuntimeQueryResult::dml_result(
1734                    raw_query.to_string(),
1735                    0,
1736                    "update",
1737                    "runtime-dml-rls",
1738                );
1739                if let Some(items) = query.returning.clone() {
1740                    response.result = build_returning_result(&items, &[], None);
1741                }
1742                return Ok(response);
1743            };
1744            let mut augmented = query.clone();
1745            augmented.filter = Some(match augmented.filter.take() {
1746                Some(existing) => {
1747                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1748                }
1749                None => policy,
1750            });
1751            augmented_query = augmented;
1752            &augmented_query
1753        } else {
1754            query
1755        };
1756
1757        // RETURNING wraps the inner executor and uses the touched-id
1758        // list the inner reports so the post-image reflects exactly
1759        // the rows the UPDATE actually mutated (not whatever a
1760        // separate SELECT might have observed).
1761        if let Some(items) = effective_query.returning.clone() {
1762            let mut inner_query = effective_query.clone();
1763            inner_query.returning = None;
1764            let (mut response, touched_ids) =
1765                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1766
1767            let snapshots = if matches!(
1768                effective_query.target,
1769                UpdateTarget::Nodes | UpdateTarget::Edges
1770            ) {
1771                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1772            } else {
1773                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1774                    .row_snapshots(&touched_ids)
1775            };
1776
1777            response.result = build_returning_result(&items, &snapshots, None);
1778            response.engine = "runtime-dml-returning";
1779            return Ok(response);
1780        }
1781
1782        self.execute_update_inner(raw_query, effective_query)
1783    }
1784
1785    /// Back-compat shim: the older entry point ignored touched ids.
1786    fn execute_update_inner(
1787        &self,
1788        raw_query: &str,
1789        query: &UpdateQuery,
1790    ) -> RedDBResult<RuntimeQueryResult> {
1791        self.execute_update_inner_tracked(raw_query, query)
1792            .map(|(res, _)| res)
1793    }
1794
1795    fn execute_update_inner_tracked(
1796        &self,
1797        raw_query: &str,
1798        query: &UpdateQuery,
1799    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1800        let store = self.inner.db.store();
1801        let effective_filter = effective_update_filter(query);
1802        let compiled_plan = self.compile_update_plan(query)?;
1803        let needs_rmw_lock = update_needs_rmw_lock(query);
1804        let table_rmw_lock = if needs_rmw_lock {
1805            Some(
1806                self.inner
1807                    .rmw_locks
1808                    .lock_for(&query.table, "__table_rmw_update__"),
1809            )
1810        } else {
1811            None
1812        };
1813        let _table_rmw_guard = table_rmw_lock.as_ref().map(|lock| lock.lock());
1814        let mut touched_ids: Vec<EntityId> = Vec::new();
1815        let limit_cap = query.limit.map(|l| l as usize);
1816        let manager = store
1817            .get_collection(&query.table)
1818            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1819        let scan_limit = if query.order_by.is_empty() {
1820            limit_cap
1821        } else {
1822            None
1823        };
1824        let mut target_scan = super::dml_target_scan::DmlTargetScan::with_update_target(
1825            self,
1826            &query.table,
1827            effective_filter.as_ref(),
1828            scan_limit,
1829            query.target,
1830        );
1831        if needs_rmw_lock {
1832            target_scan = target_scan.with_live_table_rows();
1833        }
1834        let ids_to_update = target_scan.find_target_ids()?;
1835        let ids_to_update = if query.order_by.is_empty() {
1836            ids_to_update
1837        } else {
1838            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1839        };
1840
1841        if needs_rmw_lock {
1842            return self.execute_update_inner_tracked_locked(
1843                raw_query,
1844                query,
1845                &compiled_plan,
1846                &ids_to_update,
1847                effective_filter.as_ref(),
1848            );
1849        }
1850
1851        let mut affected: u64 = 0;
1852        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1853            let mut applied_chunk = Vec::with_capacity(chunk.len());
1854            for entity in manager.get_many(chunk).into_iter().flatten() {
1855                let assignments =
1856                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1857                let applied = self.apply_materialized_update_for_entity(
1858                    query.table.clone(),
1859                    entity,
1860                    &compiled_plan,
1861                    assignments,
1862                )?;
1863                touched_ids.push(applied.id);
1864                applied_chunk.push(applied);
1865            }
1866            self.persist_update_chunk(&applied_chunk)?;
1867            affected += applied_chunk.len() as u64;
1868            let lsns = self.flush_update_chunk(&applied_chunk)?;
1869            if !query.suppress_events {
1870                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1871            }
1872        }
1873
1874        if affected > 0 {
1875            self.note_table_write(&query.table);
1876        }
1877
1878        Ok((
1879            RuntimeQueryResult::dml_result(
1880                raw_query.to_string(),
1881                affected,
1882                "update",
1883                "runtime-dml",
1884            ),
1885            touched_ids,
1886        ))
1887    }
1888
1889    fn execute_update_inner_tracked_locked(
1890        &self,
1891        raw_query: &str,
1892        query: &UpdateQuery,
1893        compiled_plan: &CompiledUpdatePlan,
1894        ids_to_update: &[EntityId],
1895        effective_filter: Option<&Filter>,
1896    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1897        let store = self.inner.db.store();
1898        let mut touched_ids = Vec::new();
1899        let mut lock_entries = Vec::new();
1900
1901        for id in ids_to_update {
1902            let Some(candidate) = store.get(&query.table, *id) else {
1903                continue;
1904            };
1905            let logical_id = candidate.logical_id();
1906            let lock_key = format!("row:{}", logical_id.raw());
1907            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1908            lock_entries.push((lock_key, logical_id, rmw_lock));
1909        }
1910
1911        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1912        lock_entries.dedup_by(|left, right| left.0 == right.0);
1913        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1914
1915        let mut applied_chunk = Vec::new();
1916        for (_, logical_id, _) in &lock_entries {
1917            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1918            else {
1919                continue;
1920            };
1921            if let Some(filter) = effective_filter {
1922                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1923                    Some(self.inner.db.as_ref()),
1924                    &entity,
1925                    filter,
1926                    &query.table,
1927                    &query.table,
1928                ) {
1929                    continue;
1930                }
1931            }
1932
1933            let assignments =
1934                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1935            let applied = self.apply_materialized_update_for_entity(
1936                query.table.clone(),
1937                entity,
1938                compiled_plan,
1939                assignments,
1940            )?;
1941            touched_ids.push(applied.id);
1942            applied_chunk.push(applied);
1943        }
1944
1945        let affected = applied_chunk.len() as u64;
1946        if !applied_chunk.is_empty() {
1947            self.persist_update_chunk(&applied_chunk)?;
1948            let lsns = self.flush_update_chunk(&applied_chunk)?;
1949            if !query.suppress_events {
1950                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1951            }
1952        }
1953
1954        if affected > 0 {
1955            self.note_table_write(&query.table);
1956        }
1957
1958        Ok((
1959            RuntimeQueryResult::dml_result(
1960                raw_query.to_string(),
1961                affected,
1962                "update",
1963                "runtime-dml",
1964            ),
1965            touched_ids,
1966        ))
1967    }
1968
1969    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1970        let mut static_field_assignments = Vec::new();
1971        let mut static_metadata_assignments = Vec::new();
1972        let mut dynamic_assignments = Vec::new();
1973        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1974        let mut row_modified_columns = Vec::new();
1975
1976        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1977            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1978            let metadata_key = resolve_sql_ttl_metadata_key(column);
1979            if compound_op.is_some() && metadata_key.is_some() {
1980                return Err(RedDBError::Query(format!(
1981                    "compound assignment is only supported for row fields: {column}"
1982                )));
1983            }
1984            if compound_op.is_none() {
1985                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1986                    if let Some(metadata_key) = metadata_key {
1987                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1988                        let (canonical_key, canonical_value) =
1989                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1990                        static_metadata_assignments
1991                            .push((canonical_key.to_string(), canonical_value));
1992                    } else {
1993                        let value = self.resolve_crypto_sentinel(value)?;
1994                        static_field_assignments.push((
1995                            column.clone(),
1996                            normalize_row_update_assignment_with_plan(
1997                                &query.table,
1998                                column,
1999                                value,
2000                                row_contract_plan.as_ref(),
2001                            )?,
2002                        ));
2003                        row_modified_columns.push(column.clone());
2004                    }
2005                    continue;
2006                }
2007            }
2008
2009            dynamic_assignments.push(CompiledUpdateAssignment {
2010                column: column.clone(),
2011                expr: expr.clone(),
2012                compound_op,
2013                metadata_key,
2014                row_rule: if metadata_key.is_none() {
2015                    if let Some(plan) = row_contract_plan.as_ref() {
2016                        if plan.timestamps_enabled
2017                            && (column == "created_at" || column == "updated_at")
2018                        {
2019                            return Err(RedDBError::Query(format!(
2020                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
2021                                query.table, column
2022                            )));
2023                        }
2024                        if let Some(rule) = plan.declared_rules.get(column) {
2025                            Some(rule.clone())
2026                        } else if plan.strict_schema {
2027                            return Err(RedDBError::Query(format!(
2028                                "collection '{}' is strict and does not allow undeclared fields: {}",
2029                                query.table, column
2030                            )));
2031                        } else {
2032                            None
2033                        }
2034                    } else {
2035                        None
2036                    }
2037                } else {
2038                    None
2039                },
2040            });
2041            if metadata_key.is_none() {
2042                row_modified_columns.push(column.clone());
2043            }
2044        }
2045
2046        let row_modified_columns = dedupe_update_columns(row_modified_columns);
2047        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
2048            row_modified_columns.iter().any(|column| {
2049                plan.unique_columns
2050                    .keys()
2051                    .any(|unique| unique.eq_ignore_ascii_case(column))
2052            })
2053        });
2054
2055        if let Some(ttl_ms) = query.ttl_ms {
2056            static_metadata_assignments
2057                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
2058        }
2059        if let Some(expires_at_ms) = query.expires_at_ms {
2060            static_metadata_assignments.push((
2061                "_expires_at".to_string(),
2062                metadata_u64_to_value(expires_at_ms),
2063            ));
2064        }
2065        for (key, val) in &query.with_metadata {
2066            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
2067        }
2068
2069        Ok(CompiledUpdatePlan {
2070            static_field_assignments,
2071            static_metadata_assignments,
2072            dynamic_assignments,
2073            row_contract_plan,
2074            row_modified_columns,
2075            row_touches_unique_columns,
2076        })
2077    }
2078
2079    fn materialize_update_assignments_for_entity(
2080        &self,
2081        query: &UpdateQuery,
2082        entity: &UnifiedEntity,
2083        compiled_plan: &CompiledUpdatePlan,
2084    ) -> RedDBResult<MaterializedUpdateAssignments> {
2085        let mut assignments = MaterializedUpdateAssignments::default();
2086        let mut record: Option<UnifiedRecord> = None;
2087
2088        for assignment in &compiled_plan.dynamic_assignments {
2089            if assignment.compound_op.is_some()
2090                && !matches!(
2091                    entity.data,
2092                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
2093                )
2094            {
2095                return Err(RedDBError::Query(format!(
2096                    "compound assignment is only supported for row or graph UPDATE column '{}'",
2097                    assignment.column
2098                )));
2099            }
2100            if record.is_none() {
2101                record = runtime_any_record_from_entity_ref(entity);
2102            }
2103            let Some(record) = record.as_ref() else {
2104                return Err(RedDBError::Query(format!(
2105                    "UPDATE could not materialize runtime record for entity {} in '{}'",
2106                    entity.id.raw(),
2107                    query.table
2108                )));
2109            };
2110            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
2111                Some(self.inner.db.as_ref()),
2112                &assignment.expr,
2113                record,
2114                Some(query.table.as_str()),
2115                Some(query.table.as_str()),
2116            )
2117            .ok_or_else(|| {
2118                RedDBError::Query(format!(
2119                    "failed to evaluate UPDATE expression for column '{}'",
2120                    assignment.column
2121                ))
2122            })?;
2123            let value = if let Some(op) = assignment.compound_op {
2124                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
2125            } else {
2126                rhs
2127            };
2128
2129            if let Some(metadata_key) = assignment.metadata_key {
2130                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
2131                let (canonical_key, canonical_value) =
2132                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2133                assignments
2134                    .dynamic_metadata_assignments
2135                    .push((canonical_key.to_string(), canonical_value));
2136            } else {
2137                assignments.dynamic_field_assignments.push((
2138                    assignment.column.clone(),
2139                    normalize_row_update_value_for_rule(
2140                        &query.table,
2141                        self.resolve_crypto_sentinel(value)?,
2142                        assignment.row_rule.as_ref(),
2143                    )?,
2144                ));
2145            }
2146        }
2147
2148        Ok(assignments)
2149    }
2150
2151    fn apply_materialized_update_for_entity(
2152        &self,
2153        collection: String,
2154        entity: UnifiedEntity,
2155        compiled_plan: &CompiledUpdatePlan,
2156        assignments: MaterializedUpdateAssignments,
2157    ) -> RedDBResult<AppliedEntityMutation> {
2158        if matches!(entity.data, EntityData::Row(_)) {
2159            return self.apply_loaded_sql_update_row_core(
2160                collection,
2161                entity,
2162                &compiled_plan.static_field_assignments,
2163                assignments.dynamic_field_assignments,
2164                &compiled_plan.static_metadata_assignments,
2165                assignments.dynamic_metadata_assignments,
2166                compiled_plan.row_contract_plan.as_ref(),
2167                &compiled_plan.row_modified_columns,
2168                compiled_plan.row_touches_unique_columns,
2169            );
2170        }
2171
2172        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2173
2174        let operations = build_patch_operations_from_materialized_assignments(
2175            &entity,
2176            compiled_plan,
2177            assignments,
2178        );
2179        self.apply_loaded_patch_entity_core(
2180            collection,
2181            entity,
2182            crate::json::Value::Null,
2183            operations,
2184        )
2185    }
2186
2187    /// Execute DELETE FROM table WHERE filter
2188    pub fn execute_delete(
2189        &self,
2190        raw_query: &str,
2191        query: &DeleteQuery,
2192    ) -> RedDBResult<RuntimeQueryResult> {
2193        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2194        // Issue #523 — blockchain collections are immutable; see
2195        // execute_update for the same gate.
2196        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2197            return Err(RedDBError::InvalidOperation(format!(
2198                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2199                query.table
2200            )));
2201        }
2202        // CollectionContract gate (#50) — see execute_update for
2203        // rationale. The gate handles APPEND ONLY rejection and is
2204        // the single point where future contract bits land.
2205        crate::runtime::collection_contract::CollectionContractGate::check(
2206            self,
2207            &query.table,
2208            crate::runtime::collection_contract::MutationKind::Delete,
2209        )?;
2210
2211        // RETURNING on DELETE: capture the pre-image via an internal
2212        // SELECT that reuses the same WHERE, then run the delete with
2213        // the RETURNING clause stripped, then project the captured
2214        // rows through the requested items. The extra SELECT is a
2215        // pragmatic MVP — a future pass can fuse the scan with the
2216        // delete to avoid the second pass over the heap.
2217        if let Some(items) = query.returning.clone() {
2218            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2219                RedDBError::Query(
2220                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2221                )
2222            })?;
2223            let captured = self.execute_query(&select_sql)?;
2224
2225            let mut inner_query = query.clone();
2226            inner_query.returning = None;
2227            let _ = self.execute_delete(raw_query, &inner_query)?;
2228
2229            let snapshots: Vec<Vec<(String, Value)>> = captured
2230                .result
2231                .records
2232                .iter()
2233                .map(|rec| {
2234                    rec.iter_fields()
2235                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2236                        .collect()
2237                })
2238                .collect();
2239            let affected = snapshots.len() as u64;
2240            let result = build_returning_result(&items, &snapshots, None);
2241
2242            let mut response = RuntimeQueryResult::dml_result(
2243                raw_query.to_string(),
2244                affected,
2245                "delete",
2246                "runtime-dml-returning",
2247            );
2248            response.result = result;
2249            return Ok(response);
2250        }
2251        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2252        //
2253        // When the table has RLS enabled, gate the DELETE by the
2254        // per-role policy set: mutations only touch rows that *every*
2255        // matching `FOR DELETE` policy would accept. No policies =>
2256        // zero rows affected (PG restrictive-default).
2257        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2258            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2259                self,
2260                &query.table,
2261                crate::storage::query::ast::PolicyAction::Delete,
2262            );
2263            let Some(policy) = rls_filter else {
2264                return Ok(RuntimeQueryResult::dml_result(
2265                    raw_query.to_string(),
2266                    0,
2267                    "delete",
2268                    "runtime-dml-rls",
2269                ));
2270            };
2271            // Fold the policy predicate into the user's WHERE before
2272            // dispatching — the remainder of this function reads the
2273            // filter from `query` via `effective_delete_filter`, which
2274            // respects the updated value.
2275            let mut augmented = query.clone();
2276            augmented.filter = Some(match augmented.filter.take() {
2277                Some(existing) => {
2278                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2279                }
2280                None => policy,
2281            });
2282            return self.execute_delete_inner(raw_query, &augmented);
2283        }
2284        self.execute_delete_inner(raw_query, query)
2285    }
2286
2287    fn execute_delete_inner(
2288        &self,
2289        raw_query: &str,
2290        query: &DeleteQuery,
2291    ) -> RedDBResult<RuntimeQueryResult> {
2292        let effective_filter = effective_delete_filter(query);
2293
2294        // Find the rows that match the WHERE clause. The "find target
2295        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2296        // the same scan strategy.
2297        let scan = super::dml_target_scan::DmlTargetScan::new(
2298            self,
2299            &query.table,
2300            effective_filter.as_ref(),
2301            None,
2302        );
2303        let ids_to_delete = scan.find_target_ids()?;
2304
2305        // For event-enabled collections, snapshot the pre-delete state
2306        // before rows are physically removed.
2307        let needs_delete_events =
2308            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2309        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2310            scan.row_json_pre_images(&ids_to_delete)
2311        } else {
2312            HashMap::new()
2313        };
2314
2315        let mut affected: u64 = 0;
2316        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2317            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2318            affected += count;
2319            if needs_delete_events && !lsns.is_empty() {
2320                // lsns.len() == actually-deleted entities; align with chunk ids.
2321                // `delete_batch` may skip missing entities, so we correlate by
2322                // the number returned (they're emitted in chunk order).
2323                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2324                self.emit_delete_events_for_collection(
2325                    &query.table,
2326                    deleted_chunk,
2327                    &lsns,
2328                    &pre_images,
2329                )?;
2330            }
2331        }
2332        pre_images.clear();
2333
2334        if affected > 0 {
2335            self.note_table_write(&query.table);
2336        }
2337
2338        Ok(RuntimeQueryResult::dml_result(
2339            raw_query.to_string(),
2340            affected,
2341            "delete",
2342            "runtime-dml",
2343        ))
2344    }
2345}
2346
2347/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2348/// columns regardless of whether any row matches the WHERE clause. The
2349/// per-entity guard below covers only the matched-rows case, but ADR 0019
2350/// declares these columns immutable on the surface itself, so a zero-row
2351/// UPDATE should still surface the same error to operators and SDKs.
2352fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2353    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2354        return Ok(());
2355    }
2356    for (column, _) in &query.assignment_exprs {
2357        if is_immutable_graph_identity_field(column) {
2358            return Err(RedDBError::Query(format!(
2359                "immutable graph field '{column}' cannot be updated"
2360            )));
2361        }
2362    }
2363    Ok(())
2364}
2365
2366fn ensure_graph_identity_update_allowed(
2367    entity: &UnifiedEntity,
2368    compiled_plan: &CompiledUpdatePlan,
2369    assignments: &MaterializedUpdateAssignments,
2370) -> RedDBResult<()> {
2371    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2372        return Ok(());
2373    }
2374
2375    for (column, _) in compiled_plan
2376        .static_field_assignments
2377        .iter()
2378        .chain(assignments.dynamic_field_assignments.iter())
2379    {
2380        if is_immutable_graph_identity_field(column) {
2381            return Err(RedDBError::Query(format!(
2382                "immutable graph field '{column}' cannot be updated"
2383            )));
2384        }
2385    }
2386
2387    Ok(())
2388}
2389
2390fn is_immutable_graph_identity_field(column: &str) -> bool {
2391    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2392        .iter()
2393        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2394}
2395
2396fn build_patch_operations_from_materialized_assignments(
2397    entity: &UnifiedEntity,
2398    compiled_plan: &CompiledUpdatePlan,
2399    assignments: MaterializedUpdateAssignments,
2400) -> Vec<PatchEntityOperation> {
2401    let mut operations = Vec::with_capacity(
2402        compiled_plan.static_field_assignments.len()
2403            + compiled_plan.static_metadata_assignments.len()
2404            + assignments.dynamic_field_assignments.len()
2405            + assignments.dynamic_metadata_assignments.len(),
2406    );
2407
2408    for (column, value) in &compiled_plan.static_field_assignments {
2409        operations.push(PatchEntityOperation {
2410            op: PatchEntityOperationType::Set,
2411            path: update_patch_path_for_entity(entity, column),
2412            value: Some(storage_value_to_json(value)),
2413        });
2414    }
2415
2416    for (column, value) in assignments.dynamic_field_assignments {
2417        operations.push(PatchEntityOperation {
2418            op: PatchEntityOperationType::Set,
2419            path: update_patch_path_for_entity(entity, &column),
2420            value: Some(storage_value_to_json(&value)),
2421        });
2422    }
2423
2424    for (key, value) in &compiled_plan.static_metadata_assignments {
2425        operations.push(PatchEntityOperation {
2426            op: PatchEntityOperationType::Set,
2427            path: vec!["metadata".to_string(), key.clone()],
2428            value: Some(metadata_value_to_json(value)),
2429        });
2430    }
2431
2432    for (key, value) in assignments.dynamic_metadata_assignments {
2433        operations.push(PatchEntityOperation {
2434            op: PatchEntityOperationType::Set,
2435            path: vec!["metadata".to_string(), key],
2436            value: Some(metadata_value_to_json(&value)),
2437        });
2438    }
2439
2440    operations
2441}
2442
2443fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2444    if matches!(
2445        (&entity.kind, &entity.data),
2446        (
2447            crate::storage::EntityKind::GraphNode(_),
2448            EntityData::Node(_)
2449        )
2450    ) && column.eq_ignore_ascii_case("node_type")
2451    {
2452        return vec!["node_type".to_string()];
2453    }
2454    if matches!(
2455        (&entity.kind, &entity.data),
2456        (
2457            crate::storage::EntityKind::GraphEdge(_),
2458            EntityData::Edge(_)
2459        )
2460    ) && column.eq_ignore_ascii_case("weight")
2461    {
2462        return vec!["weight".to_string()];
2463    }
2464    vec!["fields".to_string(), column.to_string()]
2465}
2466
2467/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2468/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2469/// capture the pre-image before actually removing the rows. Returns
2470/// `None` when the input does not start with `DELETE`.
2471///
2472/// Case-insensitive on the keywords. Preserves everything between
2473/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2474/// LIMIT survive untouched. The RETURNING tail — if present — is
2475/// truncated at the first top-level `RETURNING` token.
2476fn delete_to_select_sql(sql: &str) -> Option<String> {
2477    let trimmed = sql.trim_start();
2478    let lowered = trimmed.to_ascii_lowercase();
2479    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2480        return None;
2481    }
2482    // Find `FROM` after DELETE.
2483    let from_idx = lowered.find(" from ")?;
2484    let after_from = &trimmed[from_idx + " from ".len()..];
2485    let after_from_lc = &lowered[from_idx + " from ".len()..];
2486
2487    // Cut off the RETURNING tail (a naive search — the RETURNING
2488    // clause only appears once per statement at top level in our
2489    // grammar). Matches whitespace-bounded tokens to avoid clipping
2490    // `RETURNING` inside a string literal.
2491    let mut body = after_from.to_string();
2492    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2493        body.truncate(pos);
2494    }
2495    Some(format!("SELECT * FROM {}", body.trim_end()))
2496}
2497
2498/// Find the byte offset of a whitespace-bounded keyword in a
2499/// lowercased haystack, skipping matches inside single-quoted
2500/// string literals. Naive — no escape handling — but enough for
2501/// the shapes the DML parser emits.
2502fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2503    let bytes = haystack.as_bytes();
2504    let nlen = needle.len();
2505    let mut i = 0usize;
2506    let mut in_string = false;
2507    while i < bytes.len() {
2508        let c = bytes[i];
2509        if c == b'\'' {
2510            in_string = !in_string;
2511            i += 1;
2512            continue;
2513        }
2514        if !in_string
2515            && i + nlen <= bytes.len()
2516            && &bytes[i..i + nlen] == needle.as_bytes()
2517            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2518            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2519        {
2520            return Some(i);
2521        }
2522        i += 1;
2523    }
2524    None
2525}
2526
2527/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2528/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2529/// for one affected row; `outputs`, when provided, supplies the engine-
2530/// assigned entity id for the same row (INSERT path). Projection honours
2531/// the RETURNING items: `*` expands to every snapshot column plus
2532/// the public row envelope when available.
2533fn build_returning_result(
2534    items: &[ReturningItem],
2535    snapshots: &[Vec<(String, Value)>],
2536    outputs: Option<&[CreateEntityOutput]>,
2537) -> UnifiedResult {
2538    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2539    let public_item_outputs = outputs.is_some_and(|outs| {
2540        outs.first()
2541            .and_then(|out| out.entity.as_ref())
2542            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2543    });
2544
2545    let mut columns: Vec<String> = if project_all {
2546        let mut cols: Vec<String> = Vec::new();
2547        if public_item_outputs {
2548            cols.extend(
2549                [
2550                    "rid",
2551                    "collection",
2552                    "kind",
2553                    "tenant",
2554                    "created_at",
2555                    "updated_at",
2556                ]
2557                .into_iter()
2558                .map(str::to_string),
2559            );
2560        } else if outputs.is_some() {
2561            cols.push("rid".to_string());
2562        }
2563        if let Some(first) = snapshots.first() {
2564            for (name, _) in first {
2565                cols.push(name.clone());
2566            }
2567        }
2568        cols
2569    } else {
2570        items
2571            .iter()
2572            .filter_map(|it| match it {
2573                ReturningItem::Column(c) => Some(c.clone()),
2574                ReturningItem::All => None,
2575            })
2576            .collect()
2577    };
2578    // Guarantee unique order-preserving column list.
2579    {
2580        let mut seen = std::collections::HashSet::new();
2581        columns.retain(|c| seen.insert(c.clone()));
2582    }
2583
2584    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2585    for (idx, snap) in snapshots.iter().enumerate() {
2586        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2587        if let Some(outs) = outputs {
2588            if let Some(out) = outs.get(idx) {
2589                if let Some(entity) = out.entity.as_ref() {
2590                    if let Some(kind) = public_returning_item_kind(entity) {
2591                        values.insert(
2592                            Arc::clone(&sys_key_rid()),
2593                            Value::UnsignedInteger(out.id.raw()),
2594                        );
2595                        values.insert(
2596                            Arc::clone(&sys_key_collection()),
2597                            Value::text(entity.kind.collection().to_string()),
2598                        );
2599                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2600                        values.insert(
2601                            Arc::clone(&sys_key_created_at()),
2602                            Value::UnsignedInteger(entity.created_at),
2603                        );
2604                        values.insert(
2605                            Arc::clone(&sys_key_updated_at()),
2606                            Value::UnsignedInteger(entity.updated_at),
2607                        );
2608                    } else {
2609                        values.insert(
2610                            Arc::clone(&sys_key_rid()),
2611                            Value::Integer(out.id.raw() as i64),
2612                        );
2613                    }
2614                } else {
2615                    values.insert(
2616                        Arc::clone(&sys_key_rid()),
2617                        Value::Integer(out.id.raw() as i64),
2618                    );
2619                }
2620            }
2621        }
2622        for (name, val) in snap {
2623            values.insert(Arc::from(name.as_str()), val.clone());
2624        }
2625        if !values.contains_key("tenant") {
2626            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2627            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2628        }
2629        let mut rec = UnifiedRecord::default();
2630        // Only keep projected columns on the record.
2631        for col in &columns {
2632            if let Some(v) = values.get(col.as_str()) {
2633                rec.set_arc(Arc::from(col.as_str()), v.clone());
2634            }
2635        }
2636        records.push(rec);
2637    }
2638
2639    UnifiedResult {
2640        columns,
2641        records,
2642        stats: Default::default(),
2643        pre_serialized_json: None,
2644    }
2645}
2646
2647fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2648    match (&entity.kind, &entity.data) {
2649        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2650            Some("node")
2651        }
2652        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2653            Some("edge")
2654        }
2655        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2656        // #1369 — every entity model must expose its `rid` in RETURNING *.
2657        // Vectors carry their payload in `EntityData::Vector`, not `Row`, so
2658        // they were falling through to a no-rid envelope
2659        // and `RETURNING *` never surfaced the entity-id.
2660        (_, crate::storage::EntityData::Vector(_)) => Some("vector"),
2661        _ => None,
2662    }
2663}
2664
2665fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2666    let Some(row) = entity.data.as_row() else {
2667        return "row";
2668    };
2669
2670    let is_kv = row.named.as_ref().is_some_and(|named| {
2671        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2672            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2673    });
2674    if is_kv {
2675        return "kv";
2676    }
2677
2678    let is_document = row
2679        .named
2680        .as_ref()
2681        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2682        || row.columns.iter().any(runtime_returning_documentish_value);
2683    if is_document {
2684        "document"
2685    } else {
2686        "row"
2687    }
2688}
2689
2690fn runtime_returning_documentish_value(value: &Value) -> bool {
2691    matches!(value, Value::Json(_) | Value::Blob(_))
2692}
2693
2694fn row_insert_returning_snapshots(
2695    outputs: &[CreateEntityOutput],
2696    fallback: Vec<Vec<(String, Value)>>,
2697) -> Vec<Vec<(String, Value)>> {
2698    outputs
2699        .iter()
2700        .enumerate()
2701        .map(|(idx, out)| {
2702            out.entity
2703                .as_ref()
2704                .map(entity_row_fields_snapshot)
2705                .filter(|snap| !snap.is_empty())
2706                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2707        })
2708        .collect()
2709}
2710
2711fn graph_insert_returning_snapshots(
2712    store: &crate::storage::unified::UnifiedStore,
2713    collection: &str,
2714    ids: &[EntityId],
2715) -> Vec<Vec<(String, Value)>> {
2716    let Some(manager) = store.get_collection(collection) else {
2717        return Vec::new();
2718    };
2719
2720    ids.iter()
2721        .filter_map(|id| manager.get(*id))
2722        .filter_map(|entity| {
2723            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2724            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2725            Some(record)
2726        })
2727        .map(|record| {
2728            record
2729                .iter_fields()
2730                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2731                .collect()
2732        })
2733        .collect()
2734}
2735
2736fn graph_update_returning_snapshots(
2737    runtime: &RedDBRuntime,
2738    collection: &str,
2739    ids: &[EntityId],
2740) -> Vec<Vec<(String, Value)>> {
2741    let store = runtime.db().store();
2742    let Some(manager) = store.get_collection(collection) else {
2743        return Vec::new();
2744    };
2745
2746    manager
2747        .get_many(ids)
2748        .into_iter()
2749        .flatten()
2750        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2751        .map(|record| {
2752            record
2753                .iter_fields()
2754                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2755                .collect()
2756        })
2757        .collect()
2758}
2759
2760fn ensure_update_target_contract(
2761    runtime: &RedDBRuntime,
2762    collection: &str,
2763    target: UpdateTarget,
2764) -> RedDBResult<()> {
2765    let Some(contract) = runtime.db().collection_contract(collection) else {
2766        return Ok(());
2767    };
2768    if update_target_contract_is_advisory(&contract)
2769        || update_target_allows_model(contract.declared_model, update_target_model(target))
2770    {
2771        return Ok(());
2772    }
2773    Err(RedDBError::InvalidOperation(format!(
2774        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2775        collection,
2776        update_model_name(contract.declared_model),
2777        update_model_name(update_target_model(target))
2778    )))
2779}
2780
2781fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2782    matches!(
2783        (&contract.origin, &contract.schema_mode),
2784        (
2785            crate::physical::ContractOrigin::Implicit,
2786            crate::catalog::SchemaMode::Dynamic,
2787        )
2788    )
2789}
2790
2791fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2792    match target {
2793        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2794        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2795        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2796        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2797    }
2798}
2799
2800fn update_target_allows_model(
2801    declared_model: crate::catalog::CollectionModel,
2802    requested_model: crate::catalog::CollectionModel,
2803) -> bool {
2804    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2805}
2806
2807fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2808    match model {
2809        crate::catalog::CollectionModel::Table => "table",
2810        crate::catalog::CollectionModel::Document => "document",
2811        crate::catalog::CollectionModel::Graph => "graph",
2812        crate::catalog::CollectionModel::Vector => "vector",
2813        crate::catalog::CollectionModel::Hll => "hll",
2814        crate::catalog::CollectionModel::Sketch => "sketch",
2815        crate::catalog::CollectionModel::Filter => "filter",
2816        crate::catalog::CollectionModel::Kv => "kv",
2817        crate::catalog::CollectionModel::Config => "config",
2818        crate::catalog::CollectionModel::Vault => "vault",
2819        crate::catalog::CollectionModel::Mixed => "mixed",
2820        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2821        crate::catalog::CollectionModel::Queue => "queue",
2822        crate::catalog::CollectionModel::Metrics => "metrics",
2823    }
2824}
2825
2826fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2827    let db = runtime.db();
2828    if let Some(contract) = db.collection_contract(collection) {
2829        let advisory_implicit_dynamic = matches!(
2830            (&contract.origin, &contract.schema_mode),
2831            (
2832                crate::physical::ContractOrigin::Implicit,
2833                crate::catalog::SchemaMode::Dynamic,
2834            )
2835        );
2836        if advisory_implicit_dynamic
2837            || matches!(
2838                contract.declared_model,
2839                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2840            )
2841        {
2842            return Ok(());
2843        }
2844        return Err(RedDBError::InvalidOperation(format!(
2845            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2846            collection, contract.declared_model
2847        )));
2848    }
2849
2850    let now = std::time::SystemTime::now()
2851        .duration_since(std::time::UNIX_EPOCH)
2852        .unwrap_or_default()
2853        .as_millis();
2854    db.save_collection_contract(crate::physical::CollectionContract {
2855        name: collection.to_string(),
2856        declared_model: crate::catalog::CollectionModel::Graph,
2857        schema_mode: crate::catalog::SchemaMode::Dynamic,
2858        origin: crate::physical::ContractOrigin::Implicit,
2859        version: 1,
2860        created_at_unix_ms: now,
2861        updated_at_unix_ms: now,
2862        default_ttl_ms: db.collection_default_ttl_ms(collection),
2863        vector_dimension: None,
2864        vector_metric: None,
2865        context_index_fields: Vec::new(),
2866        declared_columns: Vec::new(),
2867        table_def: None,
2868        timestamps_enabled: false,
2869        context_index_enabled: false,
2870        metrics_raw_retention_ms: None,
2871        metrics_rollup_policies: Vec::new(),
2872        metrics_tenant_identity: None,
2873        metrics_namespace: None,
2874        append_only: false,
2875        subscriptions: Vec::new(),
2876        analytics_config: Vec::new(),
2877        session_key: None,
2878        session_gap_ms: None,
2879        retention_duration_ms: None,
2880        analytical_storage: None,
2881
2882        ai_policy: None,
2883    })
2884    .map(|_| ())
2885    .map_err(|err| RedDBError::Internal(err.to_string()))
2886}
2887
2888fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2889    query
2890        .assignment_exprs
2891        .iter()
2892        .enumerate()
2893        .any(|(idx, (column, expr))| {
2894            query
2895                .compound_assignment_ops
2896                .get(idx)
2897                .is_some_and(|op| op.is_some())
2898                || expr_references_update_column(expr, &query.table, column)
2899        })
2900}
2901
2902fn evaluate_compound_update_assignment(
2903    column: &str,
2904    record: &UnifiedRecord,
2905    op: BinOp,
2906    rhs: Value,
2907) -> RedDBResult<Value> {
2908    let lhs = record.get(column).ok_or_else(|| {
2909        RedDBError::Query(format!(
2910            "compound assignment requires existing numeric field '{column}'"
2911        ))
2912    })?;
2913    if matches!(lhs, Value::Null) {
2914        return Err(RedDBError::Query(format!(
2915            "compound assignment requires non-null numeric field '{column}'"
2916        )));
2917    }
2918    apply_compound_numeric_op(column, op, lhs, &rhs)
2919}
2920
2921fn apply_compound_numeric_op(
2922    column: &str,
2923    op: BinOp,
2924    lhs: &Value,
2925    rhs: &Value,
2926) -> RedDBResult<Value> {
2927    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2928        return Err(RedDBError::Query(format!(
2929            "compound assignment requires numeric field '{column}'"
2930        )));
2931    };
2932    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2933        return Err(RedDBError::Query(format!(
2934            "compound assignment requires numeric right-hand value for field '{column}'"
2935        )));
2936    };
2937
2938    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2939        let a = lhs_number.as_f64();
2940        let b = rhs_number.as_f64();
2941        let out = match op {
2942            BinOp::Add => a + b,
2943            BinOp::Sub => a - b,
2944            BinOp::Mul => a * b,
2945            BinOp::Div => {
2946                if b == 0.0 {
2947                    return Err(RedDBError::Query(format!(
2948                        "division by zero in compound assignment for field '{column}'"
2949                    )));
2950                }
2951                a / b
2952            }
2953            BinOp::Mod => {
2954                if b == 0.0 {
2955                    return Err(RedDBError::Query(format!(
2956                        "modulo by zero in compound assignment for field '{column}'"
2957                    )));
2958                }
2959                a % b
2960            }
2961            _ => {
2962                return Err(RedDBError::Query(format!(
2963                    "unsupported compound assignment operator for field '{column}'"
2964                )));
2965            }
2966        };
2967        if !out.is_finite() {
2968            return Err(RedDBError::Query(format!(
2969                "numeric overflow in compound assignment for field '{column}'"
2970            )));
2971        }
2972        return Ok(Value::Float(out));
2973    }
2974
2975    let a = lhs_number.as_i128();
2976    let b = rhs_number.as_i128();
2977    let out = match op {
2978        BinOp::Add => a.checked_add(b),
2979        BinOp::Sub => a.checked_sub(b),
2980        BinOp::Mul => a.checked_mul(b),
2981        BinOp::Mod => {
2982            if b == 0 {
2983                return Err(RedDBError::Query(format!(
2984                    "modulo by zero in compound assignment for field '{column}'"
2985                )));
2986            }
2987            a.checked_rem(b)
2988        }
2989        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2990        _ => None,
2991    }
2992    .ok_or_else(|| {
2993        RedDBError::Query(format!(
2994            "numeric overflow in compound assignment for field '{column}'"
2995        ))
2996    })?;
2997
2998    if matches!(lhs, Value::UnsignedInteger(_)) {
2999        let value = u64::try_from(out).map_err(|_| {
3000            RedDBError::Query(format!(
3001                "numeric overflow in compound assignment for field '{column}'"
3002            ))
3003        })?;
3004        Ok(Value::UnsignedInteger(value))
3005    } else {
3006        let value = i64::try_from(out).map_err(|_| {
3007            RedDBError::Query(format!(
3008                "numeric overflow in compound assignment for field '{column}'"
3009            ))
3010        })?;
3011        Ok(Value::Integer(value))
3012    }
3013}
3014
3015#[derive(Clone, Copy)]
3016enum CompoundNumber {
3017    Integer(i128),
3018    Float(f64),
3019}
3020
3021impl CompoundNumber {
3022    fn from_value(value: &Value) -> Option<Self> {
3023        match value {
3024            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
3025            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
3026            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
3027            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
3028            _ => None,
3029        }
3030    }
3031
3032    fn is_float(self) -> bool {
3033        matches!(self, Self::Float(_))
3034    }
3035
3036    fn as_f64(self) -> f64 {
3037        match self {
3038            Self::Integer(value) => value as f64,
3039            Self::Float(value) => value,
3040        }
3041    }
3042
3043    fn as_i128(self) -> i128 {
3044        match self {
3045            Self::Integer(value) => value,
3046            Self::Float(_) => unreachable!("float compound number used as integer"),
3047        }
3048    }
3049}
3050
3051fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
3052    match expr {
3053        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
3054        Expr::Column { field, .. } => {
3055            field_ref_matches_update_column(field, table_name, target_column)
3056        }
3057        Expr::BinaryOp { lhs, rhs, .. } => {
3058            expr_references_update_column(lhs, table_name, target_column)
3059                || expr_references_update_column(rhs, table_name, target_column)
3060        }
3061        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
3062            expr_references_update_column(operand, table_name, target_column)
3063        }
3064        Expr::FunctionCall { args, .. } => args
3065            .iter()
3066            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
3067        Expr::Case {
3068            branches, else_, ..
3069        } => {
3070            branches.iter().any(|(cond, value)| {
3071                expr_references_update_column(cond, table_name, target_column)
3072                    || expr_references_update_column(value, table_name, target_column)
3073            }) || else_
3074                .as_deref()
3075                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
3076        }
3077        Expr::IsNull { operand, .. } => {
3078            expr_references_update_column(operand, table_name, target_column)
3079        }
3080        Expr::InList { target, values, .. } => {
3081            expr_references_update_column(target, table_name, target_column)
3082                || values
3083                    .iter()
3084                    .any(|value| expr_references_update_column(value, table_name, target_column))
3085        }
3086        Expr::Between {
3087            target, low, high, ..
3088        } => {
3089            expr_references_update_column(target, table_name, target_column)
3090                || expr_references_update_column(low, table_name, target_column)
3091                || expr_references_update_column(high, table_name, target_column)
3092        }
3093        Expr::WindowFunctionCall { args, window, .. } => {
3094            args.iter()
3095                .any(|arg| expr_references_update_column(arg, table_name, target_column))
3096                || window
3097                    .partition_by
3098                    .iter()
3099                    .any(|e| expr_references_update_column(e, table_name, target_column))
3100                || window
3101                    .order_by
3102                    .iter()
3103                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
3104        }
3105    }
3106}
3107
3108fn field_ref_matches_update_column(
3109    field: &FieldRef,
3110    table_name: &str,
3111    target_column: &str,
3112) -> bool {
3113    match field {
3114        FieldRef::TableColumn { table, column } => {
3115            column.eq_ignore_ascii_case(target_column)
3116                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
3117        }
3118        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
3119            false
3120        }
3121    }
3122}
3123
3124fn resolve_update_entity_by_logical_id(
3125    runtime: &RedDBRuntime,
3126    table: &str,
3127    logical_id: EntityId,
3128) -> Option<UnifiedEntity> {
3129    let store = runtime.inner.db.store();
3130    if let Some(entity) = store.get_table_row_by_logical_id(table, logical_id) {
3131        return Some(entity);
3132    }
3133    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
3134    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
3135    store.get(table, logical_id)
3136}
3137
3138fn update_cdc_item_kind(
3139    runtime: &RedDBRuntime,
3140    collection: &str,
3141    entity: &UnifiedEntity,
3142) -> &'static str {
3143    match &entity.data {
3144        EntityData::Node(_) => return "node",
3145        EntityData::Edge(_) => return "edge",
3146        _ => {}
3147    }
3148
3149    match runtime
3150        .db()
3151        .collection_contract(collection)
3152        .map(|contract| contract.declared_model)
3153    {
3154        Some(crate::catalog::CollectionModel::Document) => "document",
3155        Some(crate::catalog::CollectionModel::Kv)
3156        | Some(crate::catalog::CollectionModel::Vault) => "kv",
3157        _ => "row",
3158    }
3159}
3160
3161fn ordered_update_target_ids(
3162    manager: &Arc<crate::storage::SegmentManager>,
3163    entity_ids: &[EntityId],
3164    order_by: &[OrderByClause],
3165    limit: Option<usize>,
3166) -> Vec<EntityId> {
3167    let mut entities: Vec<UnifiedEntity> =
3168        manager.get_many(entity_ids).into_iter().flatten().collect();
3169    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3170    if let Some(limit) = limit {
3171        entities.truncate(limit);
3172    }
3173    entities.into_iter().map(|entity| entity.id).collect()
3174}
3175
3176fn compare_update_order(
3177    left: &UnifiedEntity,
3178    right: &UnifiedEntity,
3179    order_by: &[OrderByClause],
3180) -> Ordering {
3181    for clause in order_by {
3182        let left_value = update_order_value(left, &clause.field);
3183        let right_value = update_order_value(right, &clause.field);
3184        let ordering = compare_update_order_values(
3185            left_value.as_ref(),
3186            right_value.as_ref(),
3187            clause.nulls_first,
3188        );
3189        if ordering != Ordering::Equal {
3190            return if clause.ascending {
3191                ordering
3192            } else {
3193                ordering.reverse()
3194            };
3195        }
3196    }
3197    left.logical_id().raw().cmp(&right.logical_id().raw())
3198}
3199
3200fn compare_update_order_values(
3201    left: Option<&Value>,
3202    right: Option<&Value>,
3203    nulls_first: bool,
3204) -> Ordering {
3205    match (left, right) {
3206        (None, None) => Ordering::Equal,
3207        (None, Some(_)) => {
3208            if nulls_first {
3209                Ordering::Less
3210            } else {
3211                Ordering::Greater
3212            }
3213        }
3214        (Some(_), None) => {
3215            if nulls_first {
3216                Ordering::Greater
3217            } else {
3218                Ordering::Less
3219            }
3220        }
3221        (Some(left), Some(right)) => {
3222            crate::storage::query::value_compare::total_compare_values(left, right)
3223        }
3224    }
3225}
3226
3227fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3228    let FieldRef::TableColumn { table, column } = field else {
3229        return None;
3230    };
3231    if !table.is_empty() {
3232        return None;
3233    }
3234    if column.eq_ignore_ascii_case("rid") {
3235        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3236    }
3237    match &entity.data {
3238        EntityData::Row(row) => row.get_field(column).cloned(),
3239        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3240            .and_then(|record| record.get(column).cloned()),
3241        _ => None,
3242    }
3243}
3244
3245fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3246    if columns.is_empty() {
3247        return columns;
3248    }
3249
3250    let mut unique = Vec::with_capacity(columns.len());
3251    for column in columns.drain(..) {
3252        if !unique
3253            .iter()
3254            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3255        {
3256            unique.push(column);
3257        }
3258    }
3259    unique
3260}
3261
3262// =============================================================================
3263// Helper functions for extracting typed values from column/value pairs
3264// =============================================================================
3265
3266const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3267
3268fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3269    if column.eq_ignore_ascii_case("_ttl") {
3270        Some(SQL_TTL_METADATA_COLUMNS[0])
3271    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3272        Some(SQL_TTL_METADATA_COLUMNS[1])
3273    } else if column.eq_ignore_ascii_case("_expires_at") {
3274        Some(SQL_TTL_METADATA_COLUMNS[2])
3275    } else {
3276        None
3277    }
3278}
3279
3280/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3281/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3282/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3283/// `_ttl_ms` and `_expires_at` are passed through.
3284fn canonicalize_sql_ttl_metadata(
3285    key: &'static str,
3286    value: MetadataValue,
3287) -> (&'static str, MetadataValue) {
3288    if key != "_ttl" {
3289        return (key, value);
3290    }
3291    let scaled = match value {
3292        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3293        MetadataValue::Timestamp(ms_or_s) => {
3294            // Timestamp is already chosen for very large values; treat as
3295            // already-ms to avoid silent overflow.
3296            MetadataValue::Timestamp(ms_or_s)
3297        }
3298        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3299        other => other,
3300    };
3301    ("_ttl_ms", scaled)
3302}
3303
3304/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3305/// `SECRET('...')` literals. The runtime strips this marker and
3306/// applies the actual crypto transform during INSERT execution.
3307pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3308
3309impl RedDBRuntime {
3310    /// Strip the plaintext sentinel from a `Value::Password` or
3311    /// `Value::Secret` produced by the parser and apply the real
3312    /// crypto transform. `Password` is always hashed with argon2id.
3313    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3314    /// when `red.config.secret.auto_encrypt = true` (default).
3315    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3316        match value {
3317            Value::Password(marked) => {
3318                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3319                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3320                } else {
3321                    Ok(Value::Password(marked))
3322                }
3323            }
3324            Value::Secret(bytes) => {
3325                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3326                    if !self.secret_auto_encrypt() {
3327                        return Err(RedDBError::Query(
3328                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3329                             is false. Insert pre-encrypted bytes directly instead."
3330                                .to_string(),
3331                        ));
3332                    }
3333                    let key = self.secret_aes_key().ok_or_else(|| {
3334                        RedDBError::Query(
3335                            "SECRET() column encryption requires a bootstrapped \
3336                             vault (red.secret.aes_key is missing). Start the server \
3337                             with --vault to enable."
3338                                .to_string(),
3339                        )
3340                    })?;
3341                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3342                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3343                } else {
3344                    Ok(Value::Secret(bytes))
3345                }
3346            }
3347            other => Ok(other),
3348        }
3349    }
3350}
3351
3352/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3353/// This is the on-disk representation of `Value::Secret`.
3354fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3355    let nonce_bytes = crate::auth::store::random_bytes(12);
3356    let mut nonce = [0u8; 12];
3357    nonce.copy_from_slice(&nonce_bytes[..12]);
3358    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3359    let mut out = Vec::with_capacity(12 + ct.len());
3360    out.extend_from_slice(&nonce);
3361    out.extend_from_slice(&ct);
3362    out
3363}
3364
3365/// Decode a `Value::Secret` payload back to plaintext. Returns
3366/// `None` when the payload is too short or AES-GCM authentication
3367/// fails (tampered or wrong key).
3368pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3369    if payload.len() < 12 {
3370        return None;
3371    }
3372    let mut nonce = [0u8; 12];
3373    nonce.copy_from_slice(&payload[..12]);
3374    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3375}
3376
3377fn split_insert_metadata(
3378    runtime: &RedDBRuntime,
3379    columns: &[String],
3380    values: &[Value],
3381) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3382    let mut fields = Vec::new();
3383    let mut metadata = Vec::new();
3384
3385    for (column, value) in columns.iter().zip(values.iter()) {
3386        // Still support legacy _ttl columns for backward compat
3387        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3388            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3389            let (canonical_key, canonical_value) =
3390                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3391            metadata.push((canonical_key.to_string(), canonical_value));
3392            continue;
3393        }
3394        fields.push((
3395            column.clone(),
3396            runtime.resolve_crypto_sentinel(value.clone())?,
3397        ));
3398    }
3399
3400    Ok((fields, metadata))
3401}
3402
3403/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3404fn merge_with_clauses(
3405    metadata: &mut Vec<(String, MetadataValue)>,
3406    ttl_ms: Option<u64>,
3407    expires_at_ms: Option<u64>,
3408    with_metadata: &[(String, Value)],
3409) {
3410    if let Some(ms) = ttl_ms {
3411        metadata.push((
3412            "_ttl_ms".to_string(),
3413            if ms <= i64::MAX as u64 {
3414                MetadataValue::Int(ms as i64)
3415            } else {
3416                MetadataValue::Timestamp(ms)
3417            },
3418        ));
3419    }
3420    if let Some(ms) = expires_at_ms {
3421        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3422    }
3423    for (key, value) in with_metadata {
3424        let meta_value = match value {
3425            Value::Text(s) => MetadataValue::String(s.to_string()),
3426            Value::Integer(n) => MetadataValue::Int(*n),
3427            Value::Float(n) => MetadataValue::Float(*n),
3428            Value::Boolean(b) => MetadataValue::Bool(*b),
3429            _ => MetadataValue::String(value.to_string()),
3430        };
3431        metadata.push((key.clone(), meta_value));
3432    }
3433}
3434
3435fn merge_vector_metadata_column(
3436    metadata: &mut Vec<(String, MetadataValue)>,
3437    columns: &[String],
3438    values: &[Value],
3439) -> RedDBResult<()> {
3440    let Some(value) = columns
3441        .iter()
3442        .position(|column| column.eq_ignore_ascii_case("metadata"))
3443        .map(|index| &values[index])
3444    else {
3445        return Ok(());
3446    };
3447    let json = match value {
3448        Value::Null => return Ok(()),
3449        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3450            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3451        })?,
3452        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3453            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3454        })?,
3455        other => {
3456            return Err(RedDBError::Query(format!(
3457                "column 'metadata' expected JSON object, got {other:?}"
3458            )))
3459        }
3460    };
3461    let parsed = metadata_from_json(&json)?;
3462    for (key, value) in parsed.iter() {
3463        metadata.push((key.clone(), value.clone()));
3464    }
3465    Ok(())
3466}
3467
3468fn apply_collection_default_ttl_metadata(
3469    runtime: &RedDBRuntime,
3470    collection: &str,
3471    metadata: &mut Vec<(String, MetadataValue)>,
3472) {
3473    if has_internal_ttl_metadata(metadata) {
3474        return;
3475    }
3476
3477    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3478        return;
3479    };
3480
3481    metadata.push((
3482        "_ttl_ms".to_string(),
3483        if default_ttl_ms <= i64::MAX as u64 {
3484            MetadataValue::Int(default_ttl_ms as i64)
3485        } else {
3486            MetadataValue::Timestamp(default_ttl_ms)
3487        },
3488    ));
3489}
3490
3491fn ensure_non_tree_reserved_metadata_entries(
3492    metadata: &[(String, MetadataValue)],
3493) -> RedDBResult<()> {
3494    for (key, _) in metadata {
3495        ensure_non_tree_reserved_metadata_key(key)?;
3496    }
3497    Ok(())
3498}
3499
3500fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3501    if key.starts_with(TREE_METADATA_PREFIX) {
3502        return Err(RedDBError::Query(format!(
3503            "metadata key '{}' is reserved for managed trees",
3504            key
3505        )));
3506    }
3507    Ok(())
3508}
3509
3510fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3511    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3512        return Err(RedDBError::Query(format!(
3513            "edge label '{}' is reserved for managed trees",
3514            TREE_CHILD_EDGE_LABEL
3515        )));
3516    }
3517    Ok(())
3518}
3519
3520fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3521    let mut columns = Vec::with_capacity(pairs.len());
3522    let mut values = Vec::with_capacity(pairs.len());
3523
3524    for (column, value) in pairs {
3525        columns.push(column.clone());
3526        values.push(value.clone());
3527    }
3528
3529    (columns, values)
3530}
3531
3532/// Find a required column value and return it as-is.
3533fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3534    for (i, col) in columns.iter().enumerate() {
3535        if col.eq_ignore_ascii_case(name) {
3536            return Ok(values[i].clone());
3537        }
3538    }
3539    Err(RedDBError::Query(format!(
3540        "required column '{name}' not found in INSERT"
3541    )))
3542}
3543
3544/// Find a required column value and coerce to String.
3545fn find_column_value_string(
3546    columns: &[String],
3547    values: &[Value],
3548    name: &str,
3549) -> RedDBResult<String> {
3550    let val = find_column_value(columns, values, name)?;
3551    match val {
3552        Value::Text(s) => Ok(s.to_string()),
3553        Value::Integer(n) => Ok(n.to_string()),
3554        Value::Float(n) => Ok(n.to_string()),
3555        other => Err(RedDBError::Query(format!(
3556            "column '{name}' expected text, got {other:?}"
3557        ))),
3558    }
3559}
3560
3561fn find_document_body_json(
3562    columns: &[String],
3563    values: &[Value],
3564) -> RedDBResult<crate::json::Value> {
3565    let val = find_column_value(columns, values, "body")?;
3566    match val {
3567        Value::Json(bytes) | Value::Blob(bytes) => crate::json::from_slice(&bytes)
3568            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3569        Value::Text(text) => crate::json::from_str(text.as_ref())
3570            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3571        Value::Integer(value) => crate::json::from_str(&value.to_string())
3572            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3573        Value::UnsignedInteger(value) => crate::json::from_str(&value.to_string())
3574            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3575        Value::Float(value) => crate::json::from_str(&value.to_string())
3576            .map_err(|err| RedDBError::Query(format!("invalid JSON body: {err}"))),
3577        other => Err(RedDBError::Query(format!(
3578            "column 'body' expected JSON body, got {other:?}"
3579        ))),
3580    }
3581}
3582
3583fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3584    let val = find_column_value(columns, values, name)?;
3585    match val {
3586        Value::Float(n) => Ok(n),
3587        Value::Integer(n) => Ok(n as f64),
3588        Value::UnsignedInteger(n) => Ok(n as f64),
3589        Value::Text(s) => s
3590            .parse::<f64>()
3591            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3592        other => Err(RedDBError::Query(format!(
3593            "column '{name}' expected number, got {other:?}"
3594        ))),
3595    }
3596}
3597
3598/// Find an optional column value as String.
3599fn find_column_value_opt_string(
3600    columns: &[String],
3601    values: &[Value],
3602    name: &str,
3603) -> Option<String> {
3604    for (i, col) in columns.iter().enumerate() {
3605        if col.eq_ignore_ascii_case(name) {
3606            return match &values[i] {
3607                Value::Null => None,
3608                Value::Text(s) => Some(s.to_string()),
3609                Value::Integer(n) => Some(n.to_string()),
3610                Value::Float(n) => Some(n.to_string()),
3611                _ => None,
3612            };
3613        }
3614    }
3615    None
3616}
3617
3618/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3619///
3620/// Accepts integer literals, decimal strings, and node labels resolved via
3621/// the per-collection graph label index (same source of truth that
3622/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3623/// labels error so callers can fall back to the numeric id form.
3624fn resolve_edge_endpoint(
3625    store: &crate::storage::unified::UnifiedStore,
3626    collection: &str,
3627    columns: &[String],
3628    values: &[Value],
3629    name: &str,
3630) -> RedDBResult<u64> {
3631    let val = find_column_value(columns, values, name)?;
3632    match val {
3633        Value::Integer(n) => Ok(n as u64),
3634        Value::UnsignedInteger(n) => Ok(n),
3635        Value::Text(s) => {
3636            if let Ok(n) = s.parse::<u64>() {
3637                return Ok(n);
3638            }
3639            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3640            match matches.len() {
3641                0 => Err(RedDBError::Query(format!(
3642                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3643                ))),
3644                1 => Ok(matches[0].raw()),
3645                n => Err(RedDBError::Query(format!(
3646                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3647                ))),
3648            }
3649        }
3650        other => Err(RedDBError::Query(format!(
3651            "column '{name}' expected integer or node label, got {other:?}"
3652        ))),
3653    }
3654}
3655
3656fn resolve_edge_endpoint_any(
3657    store: &crate::storage::unified::UnifiedStore,
3658    collection: &str,
3659    columns: &[String],
3660    values: &[Value],
3661    names: &[&str],
3662) -> RedDBResult<u64> {
3663    for name in names {
3664        if columns
3665            .iter()
3666            .any(|column| column.eq_ignore_ascii_case(name))
3667        {
3668            return resolve_edge_endpoint(store, collection, columns, values, name);
3669        }
3670    }
3671
3672    Err(RedDBError::Query(format!(
3673        "required column '{}' not found in INSERT",
3674        names.first().copied().unwrap_or("from_rid")
3675    )))
3676}
3677
3678/// Find a required column value and coerce to u64.
3679fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3680    let val = find_column_value(columns, values, name)?;
3681    match val {
3682        Value::Integer(n) => Ok(n as u64),
3683        Value::UnsignedInteger(n) => Ok(n),
3684        Value::Text(s) => s
3685            .parse::<u64>()
3686            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3687        other => Err(RedDBError::Query(format!(
3688            "column '{name}' expected integer, got {other:?}"
3689        ))),
3690    }
3691}
3692
3693/// Find an optional column value as f32.
3694fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3695    for (i, col) in columns.iter().enumerate() {
3696        if col.eq_ignore_ascii_case(name) {
3697            return match &values[i] {
3698                Value::Float(n) => Some(*n as f32),
3699                Value::Integer(n) => Some(*n as f32),
3700                Value::Null => None,
3701                _ => None,
3702            };
3703        }
3704    }
3705    None
3706}
3707
3708/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3709fn find_column_value_vec_f32(
3710    columns: &[String],
3711    values: &[Value],
3712    name: &str,
3713) -> RedDBResult<Vec<f32>> {
3714    let val = find_column_value(columns, values, name)?;
3715    match val {
3716        Value::Vector(v) => Ok(v),
3717        Value::Json(bytes) => {
3718            // Try to parse as JSON array of numbers
3719            let s = std::str::from_utf8(&bytes).map_err(|_| {
3720                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3721            })?;
3722            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3723                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3724            })?;
3725            Ok(arr)
3726        }
3727        other => Err(RedDBError::Query(format!(
3728            "column '{name}' expected vector, got {other:?}"
3729        ))),
3730    }
3731}
3732
3733fn find_column_value_vec_f32_any(
3734    columns: &[String],
3735    values: &[Value],
3736    names: &[&str],
3737) -> RedDBResult<Vec<f32>> {
3738    for name in names {
3739        if columns
3740            .iter()
3741            .any(|column| column.eq_ignore_ascii_case(name))
3742        {
3743            return find_column_value_vec_f32(columns, values, name);
3744        }
3745    }
3746    Err(RedDBError::Query(format!(
3747        "required vector column '{}' not found in INSERT",
3748        names.join("' or '")
3749    )))
3750}
3751
3752/// Extract remaining properties (all columns not in the exclusion list).
3753fn extract_remaining_properties(
3754    columns: &[String],
3755    values: &[Value],
3756    exclude: &[&str],
3757) -> Vec<(String, Value)> {
3758    columns
3759        .iter()
3760        .zip(values.iter())
3761        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3762        .map(|(col, val)| (col.clone(), val.clone()))
3763        .collect()
3764}
3765
3766fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3767    let mut invalid = Vec::new();
3768    for column in columns {
3769        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3770            invalid.push(column.clone());
3771        }
3772    }
3773
3774    if invalid.is_empty() {
3775        Ok(())
3776    } else {
3777        Err(RedDBError::Query(format!(
3778            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3779            invalid.join(", ")
3780        )))
3781    }
3782}
3783
3784fn is_timeseries_insert_column(column: &str) -> bool {
3785    matches!(
3786        column.to_ascii_lowercase().as_str(),
3787        "metric"
3788            | "value"
3789            | "tags"
3790            | "timestamp"
3791            | "timestamp_ns"
3792            | "time"
3793            // Analytics-event extension (#577): an analytics row carries
3794            // an `event_name` + JSON `payload`. The payload is validated
3795            // against the AnalyticsSchemaRegistry inside
3796            // `insert_timeseries_point` before the row lands.
3797            | "event_name"
3798            | "payload"
3799    )
3800}
3801
3802fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3803    let mut found = None;
3804
3805    for alias in ["timestamp_ns", "timestamp", "time"] {
3806        for (index, column) in columns.iter().enumerate() {
3807            if !column.eq_ignore_ascii_case(alias) {
3808                continue;
3809            }
3810
3811            if found.is_some() {
3812                return Err(RedDBError::Query(
3813                    "timeseries INSERT accepts only one timestamp column".to_string(),
3814                ));
3815            }
3816
3817            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3818        }
3819    }
3820
3821    Ok(found)
3822}
3823
3824fn find_timeseries_tags(
3825    columns: &[String],
3826    values: &[Value],
3827) -> RedDBResult<std::collections::HashMap<String, String>> {
3828    for (index, column) in columns.iter().enumerate() {
3829        if column.eq_ignore_ascii_case("tags") {
3830            return parse_timeseries_tags(&values[index]);
3831        }
3832    }
3833    Ok(std::collections::HashMap::new())
3834}
3835
3836fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3837    match value {
3838        Value::Null => Ok(std::collections::HashMap::new()),
3839        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3840        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3841        other => Err(RedDBError::Query(format!(
3842            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3843        ))),
3844    }
3845}
3846
3847fn parse_timeseries_tags_json(
3848    bytes: &[u8],
3849) -> RedDBResult<std::collections::HashMap<String, String>> {
3850    let json: crate::json::Value = crate::json::from_slice(bytes)
3851        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3852
3853    let object = match json {
3854        crate::json::Value::Object(object) => object,
3855        other => {
3856            return Err(RedDBError::Query(format!(
3857                "timeseries tags must be a JSON object, got {other:?}"
3858            )))
3859        }
3860    };
3861
3862    let mut tags = std::collections::HashMap::with_capacity(object.len());
3863    for (key, value) in object {
3864        tags.insert(key, json_tag_value_to_string(&value));
3865    }
3866    Ok(tags)
3867}
3868
3869/// Encode a tag value for storage so the original JSON type can be
3870/// recovered on read (issue #543).
3871///
3872/// Time-series tags are stored as `HashMap<String, String>` on the
3873/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3874/// the segment codec, WAL and gRPC mirrors don't need a new value
3875/// variant. To preserve the original JSON type across that
3876/// string-only channel we prepend the
3877/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3878/// and serialize the value as compact JSON text. The read paths
3879/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3880/// the marker, parse the suffix, and recover a real JSON value.
3881/// Tags written through other channels (Prometheus remote write,
3882/// metrics handlers, legacy on-disk data) lack the marker and are
3883/// returned as `JsonValue::String(raw)` exactly as before.
3884fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3885    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3886    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3887    buf.push_str(&value.to_string_compact());
3888    buf
3889}
3890
3891fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3892    match value {
3893        Value::UnsignedInteger(value) => Ok(*value),
3894        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3895        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3896        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3897            RedDBError::Query(format!(
3898                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3899            ))
3900        }),
3901        other => Err(RedDBError::Query(format!(
3902            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3903        ))),
3904    }
3905}
3906
3907fn current_unix_ns() -> u64 {
3908    std::time::SystemTime::now()
3909        .duration_since(std::time::UNIX_EPOCH)
3910        .unwrap_or_default()
3911        .as_nanos()
3912        .min(u128::from(u64::MAX)) as u64
3913}
3914
3915fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3916    use crate::json::{Map, Value as JV};
3917    match value {
3918        MetadataValue::Null => JV::Null,
3919        MetadataValue::Bool(value) => JV::Bool(*value),
3920        MetadataValue::Int(value) => JV::Number(*value as f64),
3921        MetadataValue::Float(value) => JV::Number(*value),
3922        MetadataValue::String(value) => JV::String(value.clone()),
3923        MetadataValue::Bytes(value) => JV::Array(
3924            value
3925                .iter()
3926                .map(|value| JV::Number(*value as f64))
3927                .collect(),
3928        ),
3929        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3930        MetadataValue::Array(values) => {
3931            JV::Array(values.iter().map(metadata_value_to_json).collect())
3932        }
3933        MetadataValue::Object(object) => {
3934            let entries = object
3935                .iter()
3936                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3937                .collect();
3938            JV::Object(entries)
3939        }
3940        MetadataValue::Geo { lat, lon } => {
3941            let mut object = Map::new();
3942            object.insert("lat".to_string(), JV::Number(*lat));
3943            object.insert("lon".to_string(), JV::Number(*lon));
3944            JV::Object(object)
3945        }
3946        MetadataValue::Reference(target) => {
3947            let mut object = Map::new();
3948            object.insert(
3949                "collection".to_string(),
3950                JV::String(target.collection().to_string()),
3951            );
3952            object.insert(
3953                "entity_id".to_string(),
3954                JV::Number(target.entity_id().raw() as f64),
3955            );
3956            JV::Object(object)
3957        }
3958        MetadataValue::References(values) => {
3959            let refs = values
3960                .iter()
3961                .map(|target| {
3962                    let mut object = Map::new();
3963                    object.insert(
3964                        "collection".to_string(),
3965                        JV::String(target.collection().to_string()),
3966                    );
3967                    object.insert(
3968                        "entity_id".to_string(),
3969                        JV::Number(target.entity_id().raw() as f64),
3970                    );
3971                    JV::Object(object)
3972                })
3973                .collect();
3974            JV::Array(refs)
3975        }
3976    }
3977}
3978
3979fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3980    match value {
3981        Value::Null => MetadataValue::Null,
3982        Value::Boolean(value) => MetadataValue::Bool(*value),
3983        Value::Integer(value) => MetadataValue::Int(*value),
3984        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3985        Value::Float(value) => MetadataValue::Float(*value),
3986        Value::Text(value) => MetadataValue::String(value.to_string()),
3987        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3988        Value::Timestamp(value) => {
3989            if *value >= 0 {
3990                metadata_u64_to_value(*value as u64)
3991            } else {
3992                MetadataValue::Int(*value)
3993            }
3994        }
3995        Value::TimestampMs(value) => {
3996            if *value >= 0 {
3997                metadata_u64_to_value(*value as u64)
3998            } else {
3999                MetadataValue::Int(*value)
4000            }
4001        }
4002        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
4003        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
4004        Value::Date(value) => MetadataValue::String(value.to_string()),
4005        Value::Time(value) => MetadataValue::String(value.to_string()),
4006        Value::Decimal(value) => MetadataValue::String(value.to_string()),
4007        Value::Ipv4(value) => MetadataValue::String(format!(
4008            "{}.{}.{}.{}",
4009            (value >> 24) & 0xFF,
4010            (value >> 16) & 0xFF,
4011            (value >> 8) & 0xFF,
4012            value & 0xFF
4013        )),
4014        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
4015        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
4016        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
4017        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
4018            lat: *lat as f64 / 1_000_000.0,
4019            lon: *lon as f64 / 1_000_000.0,
4020        },
4021        Value::BigInt(value) => MetadataValue::String(value.to_string()),
4022        Value::TableRef(value) => MetadataValue::String(value.clone()),
4023        Value::PageRef(value) => MetadataValue::Int(*value as i64),
4024        Value::Password(value) => MetadataValue::String(value.clone()),
4025        Value::Array(values) => {
4026            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
4027        }
4028        _ => MetadataValue::String(value.to_string()),
4029    }
4030}
4031
4032fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
4033    match value {
4034        Value::Null => Ok(MetadataValue::Null),
4035        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
4036        Value::Integer(_) => Err(RedDBError::Query(format!(
4037            "column '{field}' must be non-negative for TTL metadata"
4038        ))),
4039        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
4040        Value::Float(value) if value.is_finite() => {
4041            if value.fract().abs() >= f64::EPSILON {
4042                return Err(RedDBError::Query(format!(
4043                    "column '{field}' must be an integer (TTL metadata must be an integer)"
4044                )));
4045            }
4046            if *value < 0.0 {
4047                return Err(RedDBError::Query(format!(
4048                    "column '{field}' must be non-negative for TTL metadata"
4049                )));
4050            }
4051            if *value > u64::MAX as f64 {
4052                return Err(RedDBError::Query(format!(
4053                    "column '{field}' value is too large"
4054                )));
4055            }
4056            Ok(metadata_u64_to_value(*value as u64))
4057        }
4058        Value::Float(_) => Err(RedDBError::Query(format!(
4059            "column '{field}' must be a finite number"
4060        ))),
4061        Value::Text(value) => {
4062            let value = value.trim();
4063            if let Ok(value) = value.parse::<u64>() {
4064                Ok(metadata_u64_to_value(value))
4065            } else if let Ok(value) = value.parse::<i64>() {
4066                if value < 0 {
4067                    return Err(RedDBError::Query(format!(
4068                        "column '{field}' must be non-negative for TTL metadata"
4069                    )));
4070                }
4071                Ok(metadata_u64_to_value(value as u64))
4072            } else if let Ok(value) = value.parse::<f64>() {
4073                if !value.is_finite() {
4074                    return Err(RedDBError::Query(format!(
4075                        "column '{field}' must be a finite number"
4076                    )));
4077                }
4078                if value.fract().abs() >= f64::EPSILON {
4079                    return Err(RedDBError::Query(format!(
4080                        "column '{field}' must be an integer (TTL metadata must be an integer)"
4081                    )));
4082                }
4083                if value < 0.0 {
4084                    return Err(RedDBError::Query(format!(
4085                        "column '{field}' must be non-negative for TTL metadata"
4086                    )));
4087                }
4088                if value > u64::MAX as f64 {
4089                    return Err(RedDBError::Query(format!(
4090                        "column '{field}' value is too large"
4091                    )));
4092                }
4093                Ok(metadata_u64_to_value(value as u64))
4094            } else {
4095                Err(RedDBError::Query(format!(
4096                    "column '{field}' expects a numeric value for TTL metadata"
4097                )))
4098            }
4099        }
4100        _ => Err(RedDBError::Query(format!(
4101            "column '{field}' expects a numeric value for TTL metadata"
4102        ))),
4103    }
4104}
4105
4106fn metadata_u64_to_value(value: u64) -> MetadataValue {
4107    if value <= i64::MAX as u64 {
4108        MetadataValue::Int(value as i64)
4109    } else {
4110        MetadataValue::Timestamp(value)
4111    }
4112}
4113
4114/// Phase 2 PG parity: inspect a column value and return `true` when
4115/// the dotted `tail` path is already present under it. Used by the
4116/// tenant auto-fill so rows that already carry an explicit value
4117/// (bulk import, admin insert on behalf of a tenant) are not
4118/// double-stamped with the session's current_tenant().
4119fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
4120    let json = match value {
4121        Value::Null => return false,
4122        Value::Json(bytes) | Value::Blob(bytes) => {
4123            match crate::json::from_slice::<crate::json::Value>(bytes) {
4124                Ok(v) => v,
4125                Err(_) => return false,
4126            }
4127        }
4128        Value::Text(s) => {
4129            let trimmed = s.trim_start();
4130            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
4131                return false;
4132            }
4133            match crate::json::from_str::<crate::json::Value>(s) {
4134                Ok(v) => v,
4135                Err(_) => return false,
4136            }
4137        }
4138        _ => return false,
4139    };
4140    let mut cursor = &json;
4141    for seg in tail.split('.') {
4142        match cursor {
4143            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
4144                Some((_, v)) => cursor = v,
4145                None => return false,
4146            },
4147            _ => return false,
4148        }
4149    }
4150    !matches!(cursor, crate::json::Value::Null)
4151}
4152
4153/// Phase 2 PG parity: take a column value (possibly Null / Text /
4154/// Json) and return a `Value::Json` with the dotted `tail` path set
4155/// to `tenant_id`. Preserves every pre-existing key.
4156///
4157/// Accepts:
4158/// * `Value::Null`  → fresh `{tail: tenant_id}` object
4159/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
4160/// * `Value::text(s)` if `s` is valid JSON → same as Json
4161/// * anything else → error (user supplied a scalar where we need
4162///   a JSON container)
4163fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4164    let mut root = match current {
4165        Value::Null => crate::json::Value::Object(Default::default()),
4166        Value::Json(bytes) | Value::Blob(bytes) => {
4167            crate::json::from_slice(&bytes).map_err(|err| {
4168                RedDBError::Query(format!(
4169                    "tenant auto-fill: root column is not valid JSON ({err})"
4170                ))
4171            })?
4172        }
4173        Value::Text(s) => {
4174            if s.trim().is_empty() {
4175                crate::json::Value::Object(Default::default())
4176            } else {
4177                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4178                    RedDBError::Query(format!(
4179                        "tenant auto-fill: text root is not valid JSON ({err})"
4180                    ))
4181                })?
4182            }
4183        }
4184        other => {
4185            return Err(RedDBError::Query(format!(
4186                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4187            )));
4188        }
4189    };
4190
4191    // Navigate path segments, creating intermediate objects on demand.
4192    let segments: Vec<&str> = tail.split('.').collect();
4193    let mut cursor: &mut crate::json::Value = &mut root;
4194    for (i, seg) in segments.iter().enumerate() {
4195        let is_last = i + 1 == segments.len();
4196        let map = match cursor {
4197            crate::json::Value::Object(m) => m,
4198            _ => {
4199                return Err(RedDBError::Query(format!(
4200                    "tenant auto-fill: segment '{seg}' is not inside an object"
4201                )));
4202            }
4203        };
4204        if is_last {
4205            map.insert(
4206                seg.to_string(),
4207                crate::json::Value::String(tenant_id.to_string()),
4208            );
4209            break;
4210        }
4211        cursor = map
4212            .entry(seg.to_string())
4213            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4214    }
4215
4216    let bytes = crate::json::to_vec(&root).map_err(|err| {
4217        RedDBError::Query(format!(
4218            "tenant auto-fill: failed to re-serialize JSON ({err})"
4219        ))
4220    })?;
4221    Ok(Value::Json(bytes))
4222}
4223
4224#[cfg(test)]
4225mod tests {
4226    use crate::storage::schema::Value;
4227    use crate::storage::wal::{WalReader, WalRecord};
4228    use crate::storage::{DeployProfile, StoragePackaging, StorageProfileSelection};
4229    use crate::{RedDBOptions, RedDBRuntime};
4230    use std::path::Path;
4231
4232    fn persistent_operational_options(path: &Path) -> RedDBOptions {
4233        RedDBOptions::persistent(path)
4234            .with_storage_profile(StorageProfileSelection {
4235                deploy_profile: DeployProfile::Embedded,
4236                packaging: StoragePackaging::OperationalDirectory,
4237                replica_count: 0,
4238                managed_backup: false,
4239                wal_retention: false,
4240            })
4241            .unwrap()
4242    }
4243
4244    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4245        WalReader::open(wal_path)
4246            .expect("wal opens")
4247            .iter()
4248            .map(|record| record.expect("wal record decodes").1)
4249            .filter_map(|record| match record {
4250                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4251                _ => None,
4252            })
4253            .collect()
4254    }
4255
4256    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4257        action
4258            .windows(needle.len())
4259            .any(|window| window == needle.as_bytes())
4260    }
4261
4262    fn assert_statement_writes_collections_in_one_new_wal_batch(
4263        rt: &RedDBRuntime,
4264        wal_path: &Path,
4265        statement: &str,
4266        source: &str,
4267        event_queue: &str,
4268    ) {
4269        let before_batches = store_commit_batches(wal_path).len();
4270
4271        rt.execute_query(statement).unwrap();
4272
4273        let batches = store_commit_batches(wal_path);
4274        let statement_batches = &batches[before_batches..];
4275        let source_batch = statement_batches
4276            .iter()
4277            .position(|actions| {
4278                actions.iter().any(|action| {
4279                    action_contains_text(action, source)
4280                        && !action_contains_text(action, event_queue)
4281                })
4282            })
4283            .expect("source collection write batch is present");
4284        let event_batch = statement_batches
4285            .iter()
4286            .position(|actions| {
4287                actions
4288                    .iter()
4289                    .any(|action| action_contains_text(action, event_queue))
4290            })
4291            .expect("event queue write batch is present");
4292
4293        assert_eq!(
4294            source_batch, event_batch,
4295            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4296        );
4297    }
4298
4299    #[test]
4300    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4301        let dir = tempfile::tempdir().unwrap();
4302        let db_path = dir.path().join("events_dual_write.rdb");
4303        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4304        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4305
4306        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4307            .unwrap();
4308        assert_statement_writes_collections_in_one_new_wal_batch(
4309            &rt,
4310            &wal_path,
4311            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4312            "users",
4313            "users_events",
4314        );
4315    }
4316
4317    #[test]
4318    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4319        let dir = tempfile::tempdir().unwrap();
4320        let db_path = dir.path().join("events_update_atomic.rdb");
4321        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4322        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4323
4324        rt.execute_query(
4325            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4326        )
4327        .unwrap();
4328        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4329            .unwrap();
4330
4331        assert_statement_writes_collections_in_one_new_wal_batch(
4332            &rt,
4333            &wal_path,
4334            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4335            "users",
4336            "user_updates",
4337        );
4338    }
4339
4340    #[test]
4341    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4342        let dir = tempfile::tempdir().unwrap();
4343        let db_path = dir.path().join("events_delete_atomic.rdb");
4344        let wal_path = reddb_file::layout::unified_wal_path(&db_path);
4345        let rt = RedDBRuntime::with_options(persistent_operational_options(&db_path)).unwrap();
4346
4347        rt.execute_query(
4348            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4349        )
4350        .unwrap();
4351        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4352            .unwrap();
4353
4354        assert_statement_writes_collections_in_one_new_wal_batch(
4355            &rt,
4356            &wal_path,
4357            "DELETE FROM users WHERE id = 1",
4358            "users",
4359            "user_deletes",
4360        );
4361    }
4362
4363    #[test]
4364    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4365        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4366        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4367            .unwrap();
4368        for id in 0..5 {
4369            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4370                .unwrap();
4371        }
4372        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4373            .unwrap();
4374
4375        let updated = rt
4376            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4377            .unwrap();
4378        assert_eq!(updated.affected_rows, 3);
4379
4380        let selected = rt
4381            .execute_query("SELECT id, score FROM users ORDER BY id")
4382            .unwrap();
4383        let scores: Vec<(i64, i64)> = selected
4384            .result
4385            .records
4386            .iter()
4387            .map(|record| {
4388                let id = match record.get("id").unwrap() {
4389                    Value::Integer(value) => *value,
4390                    other => panic!("expected integer id, got {other:?}"),
4391                };
4392                let score = match record.get("score").unwrap() {
4393                    Value::Integer(value) => *value,
4394                    other => panic!("expected integer score, got {other:?}"),
4395                };
4396                (id, score)
4397            })
4398            .collect();
4399        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4400    }
4401
4402    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4403    /// same code path DELETE uses (#51, #52). Exercises the indexed
4404    /// equality fast-path (WHERE id = N with a HASH index), the
4405    /// unindexed range scan (WHERE score > N), and the no-WHERE
4406    /// full-scan branch to confirm the extracted "find target rows"
4407    /// loop preserves affected-row counts and the resulting row state.
4408    #[test]
4409    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4410        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4411        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4412            .unwrap();
4413        for id in 0..5 {
4414            rt.execute_query(&format!(
4415                "INSERT INTO items (id, score) VALUES ({id}, {})",
4416                id * 10
4417            ))
4418            .unwrap();
4419        }
4420        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4421            .unwrap();
4422
4423        // Indexed equality UPDATE — hits the hash fast-path inside
4424        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4425        // below the score>25 cutoff so the next assertion stays clean.
4426        let updated_one = rt
4427            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4428            .unwrap();
4429        assert_eq!(updated_one.affected_rows, 1);
4430
4431        // Unindexed scan UPDATE — bumps everyone with score > 25,
4432        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4433        // zoned/full-scan branch.
4434        let updated_many = rt
4435            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4436            .unwrap();
4437        assert_eq!(updated_many.affected_rows, 2);
4438
4439        let snapshot = rt
4440            .execute_query("SELECT id, score FROM items ORDER BY id")
4441            .unwrap();
4442        let pairs: Vec<(i64, i64)> = snapshot
4443            .result
4444            .records
4445            .iter()
4446            .map(|record| {
4447                let id = match record.get("id").unwrap() {
4448                    Value::Integer(value) => *value,
4449                    other => panic!("expected integer id, got {other:?}"),
4450                };
4451                let score = match record.get("score").unwrap() {
4452                    Value::Integer(value) => *value,
4453                    other => panic!("expected integer score, got {other:?}"),
4454                };
4455                (id, score)
4456            })
4457            .collect();
4458        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4459
4460        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4461        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4462        assert_eq!(updated_all.affected_rows, 5);
4463        let after = rt
4464            .execute_query("SELECT score FROM items ORDER BY id")
4465            .unwrap();
4466        let scores: Vec<i64> = after
4467            .result
4468            .records
4469            .iter()
4470            .map(|record| match record.get("score").unwrap() {
4471                Value::Integer(value) => *value,
4472                other => panic!("expected integer score, got {other:?}"),
4473            })
4474            .collect();
4475        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4476    }
4477
4478    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4479    /// both the index fast-path (WHERE id = N with a HASH index) and
4480    /// the unindexed scan path (WHERE score > N) to confirm the
4481    /// extracted "find target rows" loop preserves the affected-row
4482    /// count and which rows survive.
4483    #[test]
4484    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4485        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4486        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4487            .unwrap();
4488        for id in 0..5 {
4489            rt.execute_query(&format!(
4490                "INSERT INTO items (id, score) VALUES ({id}, {})",
4491                id * 10
4492            ))
4493            .unwrap();
4494        }
4495        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4496            .unwrap();
4497
4498        // Indexed equality DELETE — hits the hash fast-path inside
4499        // DmlTargetScan::find_target_ids.
4500        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4501        assert_eq!(deleted_one.affected_rows, 1);
4502
4503        // Unindexed scan DELETE — drops everyone with score > 25,
4504        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4505        // zoned/full-scan branch.
4506        let deleted_many = rt
4507            .execute_query("DELETE FROM items WHERE score > 25")
4508            .unwrap();
4509        assert_eq!(deleted_many.affected_rows, 2);
4510
4511        let surviving = rt
4512            .execute_query("SELECT id FROM items ORDER BY id")
4513            .unwrap();
4514        let ids: Vec<i64> = surviving
4515            .result
4516            .records
4517            .iter()
4518            .map(|record| match record.get("id").unwrap() {
4519                Value::Integer(value) => *value,
4520                other => panic!("expected integer id, got {other:?}"),
4521            })
4522            .collect();
4523        assert_eq!(ids, vec![0, 1]);
4524
4525        // Sanity: full-scan DELETE with no WHERE clears the rest.
4526        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4527        assert_eq!(deleted_rest.affected_rows, 2);
4528        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4529        assert!(empty.result.records.is_empty());
4530    }
4531
4532    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4533    /// INSERT but reject UPDATE and DELETE with the documented
4534    /// operator-facing error strings. Drives all three DML verbs so
4535    /// the centralized gate is exercised end-to-end.
4536    #[test]
4537    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4538        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4539        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4540            .unwrap();
4541
4542        // INSERT must succeed — APPEND ONLY exists precisely to allow
4543        // appends. The gate should be a no-op for INSERT.
4544        let inserted = rt
4545            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4546            .unwrap();
4547        assert_eq!(inserted.affected_rows, 1);
4548
4549        // UPDATE is rejected with the gate's UPDATE-specific message.
4550        let update_err = rt
4551            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4552            .unwrap_err();
4553        let msg = format!("{update_err}");
4554        assert!(
4555            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4556            "expected UPDATE rejection message, got: {msg}"
4557        );
4558
4559        // DELETE is rejected with the gate's DELETE-specific message.
4560        let delete_err = rt
4561            .execute_query("DELETE FROM events WHERE id = 1")
4562            .unwrap_err();
4563        let msg = format!("{delete_err}");
4564        assert!(
4565            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4566            "expected DELETE rejection message, got: {msg}"
4567        );
4568
4569        // Row should still be present — neither rejected mutation
4570        // touched storage.
4571        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4572        assert_eq!(surviving.result.records.len(), 1);
4573    }
4574
4575    /// CollectionContract gate: tables without an APPEND ONLY contract
4576    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4577    /// is a true pass-through, not an accidental block.
4578    #[test]
4579    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4580        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4581        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4582            .unwrap();
4583
4584        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4585            .unwrap();
4586        let updated = rt
4587            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4588            .unwrap();
4589        assert_eq!(updated.affected_rows, 1);
4590        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4591        assert_eq!(deleted.affected_rows, 1);
4592    }
4593
4594    #[test]
4595    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4596        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4597        rt.execute_query(
4598            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4599        )
4600        .unwrap();
4601
4602        let inserted = rt
4603            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4604            .unwrap();
4605        assert_eq!(inserted.affected_rows, 1);
4606
4607        let events = queue_payloads(&rt, "audit_log");
4608        assert_eq!(events.len(), 1);
4609        let event = events[0].as_object().expect("event payload object");
4610        assert!(event
4611            .get("event_id")
4612            .and_then(crate::json::Value::as_str)
4613            .is_some_and(|value| !value.is_empty()));
4614        assert_eq!(
4615            event.get("op").and_then(crate::json::Value::as_str),
4616            Some("insert")
4617        );
4618        assert_eq!(
4619            event.get("collection").and_then(crate::json::Value::as_str),
4620            Some("users")
4621        );
4622        assert_eq!(
4623            event.get("id").and_then(crate::json::Value::as_u64),
4624            Some(7)
4625        );
4626        assert!(event
4627            .get("ts")
4628            .and_then(crate::json::Value::as_u64)
4629            .is_some());
4630        assert!(event
4631            .get("lsn")
4632            .and_then(crate::json::Value::as_u64)
4633            .is_some());
4634        assert!(matches!(
4635            event.get("tenant"),
4636            Some(crate::json::Value::Null)
4637        ));
4638        assert!(matches!(
4639            event.get("before"),
4640            Some(crate::json::Value::Null)
4641        ));
4642        let after = event
4643            .get("after")
4644            .and_then(crate::json::Value::as_object)
4645            .expect("after object");
4646        assert_eq!(
4647            after.get("id").and_then(crate::json::Value::as_u64),
4648            Some(7)
4649        );
4650        assert_eq!(
4651            after.get("email").and_then(crate::json::Value::as_str),
4652            Some("a@example.com")
4653        );
4654    }
4655
4656    #[test]
4657    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4658        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4659        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4660            .unwrap();
4661
4662        rt.execute_query(
4663            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4664        )
4665        .unwrap();
4666
4667        let events = queue_payloads(&rt, "users_events");
4668        assert_eq!(events.len(), 2);
4669        let mut previous_lsn = 0;
4670        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4671            let object = event.as_object().expect("event payload object");
4672            assert_eq!(
4673                object.get("op").and_then(crate::json::Value::as_str),
4674                Some("insert")
4675            );
4676            assert_eq!(
4677                object.get("id").and_then(crate::json::Value::as_u64),
4678                Some(expected_id)
4679            );
4680            let lsn = object
4681                .get("lsn")
4682                .and_then(crate::json::Value::as_u64)
4683                .expect("event lsn");
4684            assert!(
4685                lsn > previous_lsn,
4686                "event LSNs should increase in row order"
4687            );
4688            previous_lsn = lsn;
4689            let after = object
4690                .get("after")
4691                .and_then(crate::json::Value::as_object)
4692                .expect("after object");
4693            assert_eq!(
4694                after.get("id").and_then(crate::json::Value::as_u64),
4695                Some(expected_id)
4696            );
4697        }
4698    }
4699
4700    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4701        let result = rt
4702            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4703            .expect("peek queue");
4704        result
4705            .result
4706            .records
4707            .iter()
4708            .map(
4709                |record| match record.get("payload").expect("payload column") {
4710                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4711                    other => panic!("expected JSON queue payload, got {other:?}"),
4712                },
4713            )
4714            .collect()
4715    }
4716
4717    // ── #112: auto-index user `id` on first insert ─────────────────────
4718
4719    /// First insert into a fresh collection that carries a column named
4720    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4721    /// populate it transparently, and `WHERE id = N` lookups exercise
4722    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4723    ///
4724    /// This is the load-bearing acceptance test for #112 — without the
4725    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4726    /// fall through to a full segment scan (the 4× perf gap documented
4727    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4728    #[test]
4729    fn auto_index_id_fires_on_first_insert() {
4730        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4731        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4732            .unwrap();
4733
4734        // Pre-condition: no index on `id` yet.
4735        assert!(
4736            rt.index_store_ref()
4737                .find_index_for_column("bench_users", "id")
4738                .is_none(),
4739            "freshly created collection should not have an `id` index"
4740        );
4741
4742        // Single-row INSERT — drives `MutationEngine::append_one`.
4743        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4744            .unwrap();
4745
4746        // Post-condition: hash index registered on `id`.
4747        let registered = rt
4748            .index_store_ref()
4749            .find_index_for_column("bench_users", "id")
4750            .expect("auto-index hook should have registered idx_id on first insert");
4751        assert_eq!(registered.name, "idx_id");
4752        assert_eq!(registered.collection, "bench_users");
4753        assert_eq!(registered.columns, vec!["id".to_string()]);
4754        assert!(matches!(
4755            registered.method,
4756            super::super::index_store::IndexMethodKind::Hash
4757        ));
4758
4759        // Subsequent inserts populate the index; `WHERE id = N` should
4760        // resolve via the hash fast path and round-trip every row.
4761        for id in 2..=5 {
4762            rt.execute_query(&format!(
4763                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4764                id * 10
4765            ))
4766            .unwrap();
4767        }
4768        for id in 1..=5 {
4769            let result = rt
4770                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4771                .unwrap();
4772            assert_eq!(
4773                result.result.records.len(),
4774                1,
4775                "id={id} should match one row"
4776            );
4777        }
4778
4779        // Delete via the hash fast-path — exactly the bench scenario the
4780        // perf doc identified as the 4× regression. With the index
4781        // present, `find_target_ids` short-circuits before
4782        // `for_each_entity_zoned` runs.
4783        let deleted = rt
4784            .execute_query("DELETE FROM bench_users WHERE id = 3")
4785            .unwrap();
4786        assert_eq!(deleted.affected_rows, 1);
4787    }
4788
4789    /// Bulk INSERT (the multi-row VALUES path) drives
4790    /// `MutationEngine::append_batch`. The hook must fire there too —
4791    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4792    /// wire bulk INSERT) skip auto-indexing entirely.
4793    #[test]
4794    fn auto_index_id_fires_on_first_bulk_insert() {
4795        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4796        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4797            .unwrap();
4798
4799        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4800            .unwrap();
4801
4802        let registered = rt
4803            .index_store_ref()
4804            .find_index_for_column("bench_bulk", "id")
4805            .expect("auto-index hook should fire on first bulk insert");
4806        assert_eq!(registered.name, "idx_id");
4807
4808        // Every row populated via `index_entity_insert_batch`.
4809        for id in 1..=3 {
4810            let result = rt
4811                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4812                .unwrap();
4813            assert_eq!(result.result.records.len(), 1);
4814        }
4815    }
4816
4817    /// Hook is a no-op when the row carries no `id` column. Conservative
4818    /// match (case-sensitive `id`) — `Id`, `ID`, and `rid`
4819    /// don't trigger it.
4820    #[test]
4821    fn auto_index_id_skips_when_no_id_column() {
4822        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4823        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4824            .unwrap();
4825        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4826            .unwrap();
4827
4828        assert!(rt
4829            .index_store_ref()
4830            .find_index_for_column("plain", "id")
4831            .is_none());
4832        assert!(rt
4833            .index_store_ref()
4834            .find_index_for_column("plain", "uid")
4835            .is_none());
4836    }
4837
4838    /// Hook only fires once per collection. If an explicit
4839    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4840    /// detects it via `find_index_for_column` and does NOT clobber it
4841    /// with a HASH index on the next insert.
4842    #[test]
4843    fn auto_index_id_skips_when_index_already_exists() {
4844        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4845        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4846            .unwrap();
4847        // User-declared BTREE index on `id` before any insert.
4848        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4849            .unwrap();
4850        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4851            .unwrap();
4852
4853        let registered = rt
4854            .index_store_ref()
4855            .find_index_for_column("pre", "id")
4856            .expect("user index should still be there");
4857        assert_eq!(
4858            registered.name, "user_idx",
4859            "auto-index hook must not overwrite an existing index"
4860        );
4861    }
4862
4863    /// Implicit `idx_id` is reaped when the collection drops. The
4864    /// existing `execute_drop_table` walks `list_indices` and drops every
4865    /// entry — confirm the auto-created index participates.
4866    #[test]
4867    fn auto_index_id_dropped_with_collection() {
4868        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4869        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4870            .unwrap();
4871        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4872            .unwrap();
4873        assert!(rt
4874            .index_store_ref()
4875            .find_index_for_column("ephemeral", "id")
4876            .is_some());
4877
4878        rt.execute_query("DROP TABLE ephemeral").unwrap();
4879
4880        assert!(
4881            rt.index_store_ref()
4882                .find_index_for_column("ephemeral", "id")
4883                .is_none(),
4884            "implicit `idx_id` must be reaped when its collection drops"
4885        );
4886    }
4887
4888    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4889    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4890    /// off, first insert leaves the collection without an `id` index —
4891    /// DELETE/UPDATE fall back to the scan path.
4892    #[test]
4893    fn auto_index_id_disabled_by_config() {
4894        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4895        let rt = RedDBRuntime::with_options(opts).unwrap();
4896
4897        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4898            .unwrap();
4899        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4900            .unwrap();
4901
4902        assert!(
4903            rt.index_store_ref()
4904                .find_index_for_column("off", "id")
4905                .is_none(),
4906            "with auto_index_id=false, no implicit index should be created"
4907        );
4908    }
4909
4910    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4911
4912    #[test]
4913    fn update_single_row_emits_update_event() {
4914        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4915        rt.execute_query(
4916            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4917        )
4918        .unwrap();
4919        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4920            .unwrap();
4921
4922        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4923            .unwrap();
4924
4925        let events = queue_payloads(&rt, "audit_log");
4926        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4927        let event = events[0].as_object().expect("event payload object");
4928        assert_eq!(
4929            event.get("op").and_then(crate::json::Value::as_str),
4930            Some("update")
4931        );
4932        assert_eq!(
4933            event.get("collection").and_then(crate::json::Value::as_str),
4934            Some("users")
4935        );
4936        assert!(event
4937            .get("event_id")
4938            .and_then(crate::json::Value::as_str)
4939            .is_some_and(|v| !v.is_empty()));
4940        let before = event
4941            .get("before")
4942            .and_then(crate::json::Value::as_object)
4943            .expect("before must be an object");
4944        let after = event
4945            .get("after")
4946            .and_then(crate::json::Value::as_object)
4947            .expect("after must be an object");
4948        assert_eq!(
4949            before.get("name").and_then(crate::json::Value::as_str),
4950            Some("Alice"),
4951            "before.name should be the old value"
4952        );
4953        assert_eq!(
4954            after.get("name").and_then(crate::json::Value::as_str),
4955            Some("Bob"),
4956            "after.name should be the new value"
4957        );
4958    }
4959
4960    #[test]
4961    fn update_event_only_includes_changed_fields() {
4962        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4963        rt.execute_query(
4964            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4965        )
4966        .unwrap();
4967        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4968            .unwrap();
4969
4970        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4971            .unwrap();
4972
4973        let events = queue_payloads(&rt, "evts");
4974        assert_eq!(events.len(), 1);
4975        let event = events[0].as_object().unwrap();
4976        let before = event
4977            .get("before")
4978            .and_then(crate::json::Value::as_object)
4979            .unwrap();
4980        let after = event
4981            .get("after")
4982            .and_then(crate::json::Value::as_object)
4983            .unwrap();
4984        // Only changed field included.
4985        assert!(
4986            before.contains_key("name"),
4987            "before must include changed field"
4988        );
4989        assert!(
4990            after.contains_key("name"),
4991            "after must include changed field"
4992        );
4993        // Unchanged fields must not appear.
4994        assert!(
4995            !before.contains_key("email"),
4996            "before must not include unchanged email"
4997        );
4998        assert!(
4999            !after.contains_key("email"),
5000            "after must not include unchanged email"
5001        );
5002    }
5003
5004    #[test]
5005    fn multi_row_update_emits_one_event_per_row() {
5006        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5007        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
5008            .unwrap();
5009        rt.execute_query(
5010            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
5011        )
5012        .unwrap();
5013
5014        rt.execute_query("UPDATE items SET status = 'done'")
5015            .unwrap();
5016
5017        let events = queue_payloads(&rt, "evts");
5018        assert_eq!(events.len(), 3, "expected one update event per row");
5019        for event in &events {
5020            let obj = event.as_object().unwrap();
5021            assert_eq!(
5022                obj.get("op").and_then(crate::json::Value::as_str),
5023                Some("update")
5024            );
5025        }
5026    }
5027
5028    #[test]
5029    fn delete_single_row_emits_delete_event() {
5030        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5031        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
5032            .unwrap();
5033        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
5034            .unwrap();
5035
5036        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
5037
5038        let events = queue_payloads(&rt, "del_log");
5039        assert_eq!(events.len(), 1);
5040        let event = events[0].as_object().expect("event payload object");
5041        assert_eq!(
5042            event.get("op").and_then(crate::json::Value::as_str),
5043            Some("delete")
5044        );
5045        assert_eq!(
5046            event.get("collection").and_then(crate::json::Value::as_str),
5047            Some("users")
5048        );
5049        assert!(event
5050            .get("event_id")
5051            .and_then(crate::json::Value::as_str)
5052            .is_some_and(|v| !v.is_empty()));
5053        let before = event
5054            .get("before")
5055            .and_then(crate::json::Value::as_object)
5056            .expect("before must be an object for delete");
5057        assert_eq!(
5058            before.get("id").and_then(crate::json::Value::as_u64),
5059            Some(42)
5060        );
5061        assert_eq!(
5062            before.get("name").and_then(crate::json::Value::as_str),
5063            Some("Alice")
5064        );
5065        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
5066    }
5067
5068    #[test]
5069    fn multi_row_delete_emits_one_event_per_row() {
5070        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5071        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
5072            .unwrap();
5073        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
5074            .unwrap();
5075
5076        rt.execute_query("DELETE FROM items").unwrap();
5077
5078        let events = queue_payloads(&rt, "del_log");
5079        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
5080        for event in &events {
5081            let obj = event.as_object().unwrap();
5082            assert_eq!(
5083                obj.get("op").and_then(crate::json::Value::as_str),
5084                Some("delete")
5085            );
5086            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
5087        }
5088    }
5089
5090    #[test]
5091    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
5092        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5093        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
5094            .unwrap();
5095
5096        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5097            .unwrap();
5098        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
5099
5100        let events = queue_payloads(&rt, "evts");
5101        assert!(
5102            events.is_empty(),
5103            "UPDATE-only filter must not emit INSERT or DELETE events"
5104        );
5105    }
5106
5107    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
5108
5109    #[test]
5110    fn suppress_events_on_insert_emits_no_events() {
5111        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5112        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5113            .unwrap();
5114
5115        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5116            .unwrap();
5117
5118        let events = queue_payloads(&rt, "evts");
5119        assert!(
5120            events.is_empty(),
5121            "SUPPRESS EVENTS must prevent INSERT events"
5122        );
5123    }
5124
5125    #[test]
5126    fn suppress_events_on_update_emits_no_events() {
5127        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5128        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5129            .unwrap();
5130        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
5131            .unwrap();
5132        // drain the INSERT event
5133        let _ = queue_payloads(&rt, "evts");
5134        // Force pop to drain; simpler: just check new count after UPDATE
5135        rt.execute_query("QUEUE PURGE evts").unwrap();
5136
5137        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
5138            .unwrap();
5139
5140        let events = queue_payloads(&rt, "evts");
5141        assert!(
5142            events.is_empty(),
5143            "SUPPRESS EVENTS must prevent UPDATE events"
5144        );
5145    }
5146
5147    #[test]
5148    fn suppress_events_on_delete_emits_no_events() {
5149        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5150        rt.execute_query(
5151            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
5152        )
5153        .unwrap();
5154        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5155            .unwrap();
5156
5157        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
5158            .unwrap();
5159
5160        let events = queue_payloads(&rt, "evts");
5161        assert!(
5162            events.is_empty(),
5163            "SUPPRESS EVENTS must prevent DELETE events"
5164        );
5165    }
5166
5167    #[test]
5168    fn normal_insert_after_suppress_still_emits() {
5169        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5170        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5171            .unwrap();
5172
5173        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5174            .unwrap();
5175        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5176            .unwrap();
5177
5178        let events = queue_payloads(&rt, "evts");
5179        assert_eq!(
5180            events.len(),
5181            1,
5182            "only the non-suppressed INSERT should emit"
5183        );
5184        assert_eq!(
5185            events[0].get("id").and_then(crate::json::Value::as_u64),
5186            Some(2)
5187        );
5188    }
5189}