Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
148    /// target table is tenant-scoped and the user's column list does
149    /// not already name the tenant column.
150    ///
151    /// Returns:
152    /// * `Ok(None)` — no injection needed (non-tenant table, or user
153    ///   supplied the column explicitly). Caller uses the original
154    ///   query unchanged.
155    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
156    ///   + literal value appended to every row.
157    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
158    ///   the current session. Fails loudly so callers don't produce
159    ///   rows that RLS would then hide on read.
160    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
161        let Some(tenant_col) = self.tenant_column(&query.table) else {
162            return Ok(None);
163        };
164        // User already named the column (literal match) — trust them.
165        if query
166            .columns
167            .iter()
168            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
169        {
170            return Ok(None);
171        }
172
173        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
174        // nested key like `headers.tenant` we operate on the root
175        // column (`headers`) and set / add the nested path inside its
176        // JSON value. If the user named the root column we mutate in
177        // place; otherwise we create a fresh JSON column for every row.
178        if let Some(dot_pos) = tenant_col.find('.') {
179            let (root, tail) = tenant_col.split_at(dot_pos);
180            let tail = &tail[1..]; // drop leading '.'
181            return self.inject_dotted_tenant(query, root, tail);
182        }
183
184        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
185            return Err(RedDBError::Query(format!(
186                "INSERT into tenant-scoped table '{}' requires an active tenant — \
187                 run SET TENANT '<id>' first or name column '{}' explicitly",
188                query.table, tenant_col
189            )));
190        };
191
192        let mut augmented = query.clone();
193        augmented.columns.push(tenant_col);
194        let lit = Value::text(tenant_id.clone());
195        for row in augmented.values.iter_mut() {
196            row.push(lit.clone());
197        }
198        for row in augmented.value_exprs.iter_mut() {
199            row.push(crate::storage::query::ast::Expr::Literal {
200                value: lit.clone(),
201                span: crate::storage::query::ast::Span::synthetic(),
202            });
203        }
204        Ok(Some(augmented))
205    }
206
207    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
208    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
209    /// nested JSON instead of appending a flat column.
210    ///
211    /// Cases:
212    /// * Root column already in the INSERT list → mutate per-row JSON
213    ///   (parse, set path, re-serialize).
214    /// * Root column absent → create a fresh `{tail: tenant}` JSON
215    ///   object and append the root column to the INSERT.
216    fn inject_dotted_tenant(
217        &self,
218        query: &InsertQuery,
219        root: &str,
220        tail: &str,
221    ) -> RedDBResult<Option<InsertQuery>> {
222        let active_tenant = crate::runtime::impl_core::current_tenant();
223        let mut augmented = query.clone();
224        let root_idx = augmented
225            .columns
226            .iter()
227            .position(|c| c.eq_ignore_ascii_case(root));
228
229        if let Some(idx) = root_idx {
230            // User supplied the root column. Per-row: if the dotted
231            // tail is already present we trust the user (admin / bulk
232            // loader scenario); otherwise fill from the active
233            // tenant. An unbound tenant is only an error when some
234            // row actually needs filling.
235            for row in augmented.values.iter_mut() {
236                let Some(slot) = row.get_mut(idx) else {
237                    continue;
238                };
239                if dotted_tail_already_set(slot, tail) {
240                    continue;
241                }
242                let Some(tenant_id) = &active_tenant else {
243                    return Err(RedDBError::Query(format!(
244                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
245                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
246                        query.table, root, tail
247                    )));
248                };
249                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
250            }
251            // Expression row is kept in sync by re-wrapping the
252            // mutated literal; the canonical path will re-evaluate
253            // against the same JSON shape.
254            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
255                if let Some(slot) = row.get_mut(idx) {
256                    let new_value = augmented
257                        .values
258                        .get(row_idx)
259                        .and_then(|v| v.get(idx))
260                        .cloned()
261                        .unwrap_or(Value::Null);
262                    *slot = crate::storage::query::ast::Expr::Literal {
263                        value: new_value,
264                        span: crate::storage::query::ast::Span::synthetic(),
265                    };
266                }
267            }
268        } else {
269            // No root column in the INSERT list — auto-fill needs a
270            // bound tenant to synthesise one. Error loud so we never
271            // create a tenant-less row that RLS would then hide.
272            let Some(tenant_id) = &active_tenant else {
273                return Err(RedDBError::Query(format!(
274                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
276                    query.table, root, tail
277                )));
278            };
279            // Create a fresh JSON column with only the tenant path set.
280            augmented.columns.push(root.to_string());
281            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
282            for row in augmented.values.iter_mut() {
283                row.push(fresh.clone());
284            }
285            for row in augmented.value_exprs.iter_mut() {
286                row.push(crate::storage::query::ast::Expr::Literal {
287                    value: fresh.clone(),
288                    span: crate::storage::query::ast::Span::synthetic(),
289                });
290            }
291        }
292
293        Ok(Some(augmented))
294    }
295
296    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
297    /// `lsns` is empty because events fire at commit time.
298    fn delete_entities_batch(
299        &self,
300        collection: &str,
301        ids: &[EntityId],
302    ) -> RedDBResult<(u64, Vec<u64>)> {
303        if ids.is_empty() {
304            return Ok((0, vec![]));
305        }
306
307        let store = self.db().store();
308        let Some(manager) = store.get_collection(collection) else {
309            return Ok((0, vec![]));
310        };
311
312        let active_xid = self.current_xid();
313        let conn_id = crate::runtime::impl_core::current_connection_id();
314        let mut autocommit_xid = None;
315        let mut tombstoned_ids = Vec::new();
316        let mut tombstoned_entities = Vec::new();
317        let mut physical_delete_ids = Vec::new();
318        let table_row_resolver =
319            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
320
321        for &id in ids {
322            let Some(mut entity) = manager.get(id) else {
323                continue;
324            };
325            if matches!(entity.data, EntityData::Row(_)) {
326                let previous_xmax = entity.xmax;
327                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
328                    if table_row_resolver.resolve_candidate(&entity).is_none() {
329                        continue;
330                    }
331                } else if entity.xmax != 0 {
332                    continue;
333                }
334
335                let xid = match active_xid {
336                    Some(xid) => xid,
337                    None => match autocommit_xid {
338                        Some(xid) => xid,
339                        None => {
340                            let mgr = self.snapshot_manager();
341                            let xid = mgr.begin();
342                            autocommit_xid = Some(xid);
343                            xid
344                        }
345                    },
346                };
347                entity.set_xmax(xid);
348                if manager.update(entity.clone()).is_ok() {
349                    if active_xid.is_some() {
350                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
351                    }
352                    tombstoned_entities.push(entity);
353                    tombstoned_ids.push(id);
354                }
355            } else {
356                physical_delete_ids.push(id);
357            }
358        }
359
360        if let Some(xid) = autocommit_xid {
361            self.snapshot_manager().commit(xid);
362        }
363
364        let mut affected = tombstoned_ids.len() as u64;
365        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
366        if active_xid.is_some() {
367            store
368                .persist_entities_to_pager(collection, &tombstoned_entities)
369                .map_err(|err| RedDBError::Internal(err.to_string()))?;
370        } else {
371            store
372                .persist_entities_to_pager(collection, &tombstoned_entities)
373                .map_err(|err| RedDBError::Internal(err.to_string()))?;
374            for id in &tombstoned_ids {
375                store.context_index().remove_entity(*id);
376                let lsn = self.cdc_emit(
377                    crate::replication::cdc::ChangeOperation::Delete,
378                    collection,
379                    id.raw(),
380                    "entity",
381                );
382                lsns.push(lsn);
383            }
384        }
385
386        let deleted_ids = store
387            .delete_batch(collection, &physical_delete_ids)
388            .map_err(|err| RedDBError::Internal(err.to_string()))?;
389        affected += deleted_ids.len() as u64;
390        for id in &deleted_ids {
391            store.context_index().remove_entity(*id);
392            let lsn = self.cdc_emit(
393                crate::replication::cdc::ChangeOperation::Delete,
394                collection,
395                id.raw(),
396                "entity",
397            );
398            lsns.push(lsn);
399        }
400
401        Ok((affected, lsns))
402    }
403
404    /// Flushes context-index updates and CDC for each applied mutation.
405    /// Returns one LSN per entity in the same order as `applied`.
406    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
407        if applied.is_empty() {
408            return Ok(Vec::new());
409        }
410
411        let store = self.db().store();
412        if applied.iter().any(|item| item.context_index_dirty) {
413            store.context_index().index_entities(
414                &applied[0].collection,
415                applied
416                    .iter()
417                    .filter(|item| item.context_index_dirty)
418                    .map(|item| &item.entity),
419            );
420        }
421
422        for item in applied {
423            self.refresh_update_secondary_indexes(item)?;
424        }
425
426        let mut lsns = Vec::with_capacity(applied.len());
427        for item in applied {
428            let lsn = self.cdc_emit_prebuilt(
429                crate::replication::cdc::ChangeOperation::Update,
430                &item.collection,
431                &item.entity,
432                update_cdc_item_kind(self, &item.collection, &item.entity),
433                item.metadata.as_ref(),
434                false,
435            );
436            lsns.push(lsn);
437        }
438        Ok(lsns)
439    }
440
441    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
442        self.persist_applied_entity_mutations(applied)
443    }
444
445    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
446        if applied.pre_mutation_fields.is_empty() {
447            return Ok(());
448        }
449        let post = entity_row_fields_snapshot(&applied.entity);
450        if post.is_empty() {
451            return Ok(());
452        }
453
454        let indexed_cols = self
455            .index_store_ref()
456            .indexed_columns_set(&applied.collection);
457        if indexed_cols.is_empty() {
458            return Ok(());
459        }
460
461        if let Some(old_version) = applied.replaced_entity.as_ref() {
462            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
463                .pre_mutation_fields
464                .iter()
465                .filter(|(col, _)| indexed_cols.contains(col))
466                .cloned()
467                .collect();
468            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
469                .iter()
470                .filter(|(col, _)| indexed_cols.contains(col))
471                .cloned()
472                .collect();
473            if !old_index_fields.is_empty() {
474                self.index_store_ref()
475                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
476                    .map_err(crate::RedDBError::Internal)?;
477            }
478            if !new_index_fields.is_empty() {
479                self.index_store_ref()
480                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
481                    .map_err(crate::RedDBError::Internal)?;
482            }
483            return Ok(());
484        }
485
486        let damage =
487            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
488        if damage
489            .touched_columns()
490            .into_iter()
491            .any(|col| indexed_cols.contains(col))
492        {
493            self.index_store_ref()
494                .index_entity_update(
495                    &applied.collection,
496                    applied.id,
497                    &applied.pre_mutation_fields,
498                    &post,
499                )
500                .map_err(crate::RedDBError::Internal)?;
501        }
502        Ok(())
503    }
504
505    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
506    ///
507    /// Each row in `query.values` is zipped with `query.columns` to produce a
508    /// set of named fields, which is then dispatched based on entity_type.
509    pub fn execute_insert(
510        &self,
511        raw_query: &str,
512        query: &InsertQuery,
513    ) -> RedDBResult<RuntimeQueryResult> {
514        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
515        // CollectionContract gate (#49): single entry point for the
516        // operator's collection-level write rules. Today this is a
517        // no-op for INSERT (APPEND ONLY permits insert); routing
518        // through the gate now means future contract bits — versioned,
519        // vault-only writes — plug in once instead of per verb.
520        crate::runtime::collection_contract::CollectionContractGate::check(
521            self,
522            &query.table,
523            crate::runtime::collection_contract::MutationKind::Insert,
524        )?;
525        // Phase 2.5.4 table-scoped tenancy: if the target table is
526        // tenant-scoped and the user didn't name the tenant column,
527        // auto-inject it with the thread-local `CURRENT_TENANT()`
528        // value. When the column is named explicitly we trust the
529        // caller (useful for admin tooling that writes on behalf of
530        // specific tenants). An unbound tenant on an implicit-fill
531        // path errors up front rather than producing a row the RLS
532        // policy would silently hide.
533        let augmented_owned;
534        let query = match self.maybe_inject_tenant_column(query)? {
535            Some(new_q) => {
536                augmented_owned = new_q;
537                &augmented_owned
538            }
539            None => query,
540        };
541        self.check_insert_column_policy(query)?;
542
543        let mut inserted_count: u64 = 0;
544        let effective_rows =
545            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
546
547        // Ensure the collection exists (auto-create on first insert).
548        let store = self.inner.db.store();
549        let _ = store.get_or_create_collection(&query.table);
550        let declared_model = self
551            .db()
552            .collection_contract_arc(&query.table)
553            .map(|contract| contract.declared_model);
554
555        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
556            if query.returning.is_some() {
557                Some(Vec::with_capacity(effective_rows.len()))
558            } else {
559                None
560            };
561        let mut returning_result: Option<UnifiedResult> = None;
562
563        if matches!(query.entity_type, InsertEntityType::Row)
564            && !matches!(
565                declared_model,
566                Some(crate::catalog::CollectionModel::TimeSeries)
567            )
568        {
569            // Issue #523 + #524: blockchain collections seal each row into the
570            // chain. When the caller omits the reserved columns, the engine
571            // auto-fills (#523). When the caller supplies any reserved column,
572            // the values are validated against the current tip and a mismatch
573            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
574            //
575            // The whole batch runs under a per-collection chain lock so two
576            // concurrent submitters can't both bind to the same prev_hash —
577            // the loser observes the advanced tip and gets 409 with the new
578            // tip so it can retry.
579            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
580            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
581                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
582            } else {
583                None
584            };
585            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
586
587            // Issue #525 — refuse new blocks if the chain has been marked
588            // `integrity = broken` until an admin clears the flag.
589            if chain_mode && self.is_chain_integrity_broken(&query.table) {
590                return Err(RedDBError::InvalidOperation(format!(
591                    "ChainIntegrityBroken: collection '{}' is locked until \
592                     POST /collections/{}/clear-integrity-flag is called by an admin",
593                    query.table, query.table
594                )));
595            }
596
597            // Pull the tip from the in-memory cache; fall back to a one-time
598            // scan if the cache hasn't seen this collection yet (cold start
599            // after restart). Cache is updated below as rows are sealed.
600            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
601                if chain_mode {
602                    let mut cache = self.inner.chain_tip_cache.lock();
603                    if let Some(existing) = cache.get(&query.table) {
604                        Some(existing.clone())
605                    } else if let Some(scanned) =
606                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
607                    {
608                        cache.insert(query.table.clone(), scanned.clone());
609                        Some(scanned)
610                    } else {
611                        None
612                    }
613                } else {
614                    None
615                };
616
617            let mut rows = Vec::with_capacity(effective_rows.len());
618            for row_values in &effective_rows {
619                if row_values.len() != query.columns.len() {
620                    return Err(RedDBError::Query(format!(
621                        "INSERT column count ({}) does not match value count ({})",
622                        query.columns.len(),
623                        row_values.len()
624                    )));
625                }
626                let (mut fields, mut metadata) =
627                    split_insert_metadata(self, &query.columns, row_values)?;
628                if chain_mode {
629                    use crate::runtime::blockchain_kind::{
630                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
631                        COL_TIMESTAMP, RESERVED_COLUMNS,
632                    };
633                    let supplied_height = fields
634                        .iter()
635                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
636                        .map(|(_, v)| v.clone());
637                    let supplied_prev = fields
638                        .iter()
639                        .find(|(k, _)| k == COL_PREV_HASH)
640                        .map(|(_, v)| v.clone());
641                    let supplied_ts = fields
642                        .iter()
643                        .find(|(k, _)| k == COL_TIMESTAMP)
644                        .map(|(_, v)| v.clone());
645                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
646                    let user_supplied_any = supplied_height.is_some()
647                        || supplied_prev.is_some()
648                        || supplied_ts.is_some()
649                        || supplied_hash;
650
651                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
652                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
653
654                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
655                        Some(t) => (t.hash, t.height + 1),
656                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
657                    };
658                    let server_now = crate::runtime::blockchain_kind::now_ms();
659
660                    let (use_prev, use_height, use_ts) = if user_supplied_any {
661                        // Caller is participating in the chain protocol —
662                        // every field must be supplied AND match the tip.
663                        if supplied_hash {
664                            return Err(chain_conflict_error(
665                                tip_next_height.saturating_sub(1),
666                                tip_prev_hash,
667                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
668                                server_now,
669                                "hash column is engine-computed and cannot be supplied",
670                            ));
671                        }
672                        let caller_prev = match &supplied_prev {
673                            Some(Value::Blob(b)) if b.len() == 32 => {
674                                let mut a = [0u8; 32];
675                                a.copy_from_slice(b);
676                                a
677                            }
678                            Some(Value::Text(s)) if s.len() == 64 => {
679                                // Accept hex-encoded prev_hash so JSON / SQL
680                                // callers without literal-blob syntax can
681                                // still participate in the chain protocol.
682                                let mut a = [0u8; 32];
683                                let mut ok = true;
684                                for (i, slot) in a.iter_mut().enumerate() {
685                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
686                                    match u8::from_str_radix(pair, 16) {
687                                        Ok(byte) => *slot = byte,
688                                        Err(_) => {
689                                            ok = false;
690                                            break;
691                                        }
692                                    }
693                                }
694                                if !ok {
695                                    return Err(chain_conflict_error(
696                                        tip_next_height.saturating_sub(1),
697                                        tip_prev_hash,
698                                        chain_tip_full
699                                            .as_ref()
700                                            .map(|t| t.timestamp_ms)
701                                            .unwrap_or(0),
702                                        server_now,
703                                        "prev_hash is not valid hex",
704                                    ));
705                                }
706                                a
707                            }
708                            _ => {
709                                return Err(chain_conflict_error(
710                                    tip_next_height.saturating_sub(1),
711                                    tip_prev_hash,
712                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
713                                    server_now,
714                                    "prev_hash missing or not a 32-byte Blob",
715                                ));
716                            }
717                        };
718                        if caller_prev != tip_prev_hash {
719                            return Err(chain_conflict_error(
720                                tip_next_height.saturating_sub(1),
721                                tip_prev_hash,
722                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
723                                server_now,
724                                "prev_hash does not match current tip",
725                            ));
726                        }
727                        let caller_height = match &supplied_height {
728                            Some(Value::UnsignedInteger(v)) => *v,
729                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
730                            _ => {
731                                return Err(chain_conflict_error(
732                                    tip_next_height.saturating_sub(1),
733                                    tip_prev_hash,
734                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
735                                    server_now,
736                                    "block_height missing or not an unsigned integer",
737                                ));
738                            }
739                        };
740                        if caller_height != tip_next_height {
741                            return Err(chain_conflict_error(
742                                tip_next_height.saturating_sub(1),
743                                tip_prev_hash,
744                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
745                                server_now,
746                                "block_height does not match tip+1",
747                            ));
748                        }
749                        let caller_ts = match &supplied_ts {
750                            Some(Value::UnsignedInteger(v)) => *v,
751                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
752                            _ => {
753                                return Err(chain_conflict_error(
754                                    tip_next_height.saturating_sub(1),
755                                    tip_prev_hash,
756                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
757                                    server_now,
758                                    "timestamp missing or not an unsigned integer",
759                                ));
760                            }
761                        };
762                        let drift = (caller_ts as i128) - (server_now as i128);
763                        if drift.abs() > 60_000 {
764                            return Err(chain_conflict_error(
765                                tip_next_height.saturating_sub(1),
766                                tip_prev_hash,
767                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
768                                server_now,
769                                "timestamp outside ±60s of server_time",
770                            ));
771                        }
772                        (caller_prev, caller_height, caller_ts)
773                    } else {
774                        (tip_prev_hash, tip_next_height, server_now)
775                    };
776
777                    let (reserved, new_hash) =
778                        crate::runtime::blockchain_kind::make_block_reserved_fields(
779                            use_prev, use_height, use_ts, &payload,
780                        );
781                    fields.extend(reserved);
782                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
783                        height: use_height,
784                        hash: new_hash,
785                        timestamp_ms: use_ts,
786                    });
787                }
788                // Issue #522 — signed-writes verification. On collections
789                // created with `SIGNED_BY (...)` the row must carry valid
790                // `signer_pubkey` + `signature` reserved columns. Runs
791                // after chain_mode so canonical payload covers user-supplied
792                // fields only (blockchain reserved columns are filtered by
793                // `canonical_payload`; the two signed-writes reserved
794                // columns are split out before payload computation, then
795                // re-attached for storage). The blockchain + SIGNED_BY
796                // composition is owned by issue #526; we keep #522 to the
797                // non-chain path and let chain_mode collections punt to that
798                // slice rather than half-wire it here.
799                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
800                    let (pk_col, sig_col, residual) =
801                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
802                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
803                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
804                    crate::runtime::signed_writes_kind::verify_row(
805                        &reg,
806                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
807                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
808                        &payload,
809                    )
810                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
811                    fields = residual;
812                    // Round-trip the reserved columns with the value
813                    // type the caller supplied (Text/hex on the SQL path,
814                    // Blob on the binary path). Keeps SELECT and WHERE
815                    // predicates symmetric with the INSERT shape.
816                    if let Some(col) = pk_col {
817                        fields.push((
818                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
819                            col.raw_value,
820                        ));
821                    }
822                    if let Some(col) = sig_col {
823                        fields.push((
824                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
825                            col.raw_value,
826                        ));
827                    }
828                }
829                merge_with_clauses(
830                    &mut metadata,
831                    query.ttl_ms,
832                    query.expires_at_ms,
833                    &query.with_metadata,
834                );
835                if let Some(snaps) = returning_snapshots.as_mut() {
836                    snaps.push(fields.clone());
837                }
838                rows.push(CreateRowInput {
839                    collection: query.table.clone(),
840                    fields,
841                    metadata,
842                    node_links: Vec::new(),
843                    vector_links: Vec::new(),
844                });
845            }
846            let outputs = self.create_rows_batch(CreateRowsBatchInput {
847                collection: query.table.clone(),
848                rows,
849                suppress_events: query.suppress_events,
850            })?;
851            inserted_count = outputs.len() as u64;
852
853            // Chain mode: commit the new tip to the in-memory cache only after
854            // the batch persisted successfully. If the batch threw mid-way the
855            // cache stays on the previous tip and the chain lock releases.
856            if chain_mode {
857                if let Some(new_tip) = chain_tip_full.as_ref() {
858                    self.inner
859                        .chain_tip_cache
860                        .lock()
861                        .insert(query.table.clone(), new_tip.clone());
862                }
863            }
864
865            // Hypertable chunk routing: if this table was declared via
866            // CREATE HYPERTABLE, register each row's time-column value
867            // with the registry so chunk metadata (bounds, row counts,
868            // TTL eligibility) stays current. This is what lets
869            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
870            // retention daemon sweep expired chunks without scanning
871            // every row.
872            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
873                let time_col = &spec.time_column;
874                // Find the column's index in the INSERT column list.
875                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
876                    for row in &effective_rows {
877                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
878                            if *n >= 0 {
879                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
880                            }
881                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
882                            let _ = self.inner.db.hypertables().route(&query.table, *n);
883                        }
884                    }
885                }
886            }
887
888            if let (Some(items), Some(snaps)) =
889                (query.returning.as_ref(), returning_snapshots.take())
890            {
891                let snaps = row_insert_returning_snapshots(&outputs, snaps);
892                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
893            }
894        } else {
895            // Issue #419: surface the inserted entity id on every INSERT path.
896            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
897            // so a RETURNING clause (and the unconditional inserted_ids list,
898            // below) can expose the engine-assigned id. TimeSeries (the row
899            // branch in this else) still returns the not-supported error
900            // because create_timeseries_point isn't plumbed through this fn.
901            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
902                Vec::with_capacity(effective_rows.len());
903            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
904            {
905                Vec::with_capacity(effective_rows.len())
906            } else {
907                Vec::new()
908            };
909            if matches!(
910                query.entity_type,
911                InsertEntityType::Node | InsertEntityType::Edge
912            ) {
913                enum PreparedGraphInsert {
914                    Node {
915                        fields: Vec<(String, Value)>,
916                        input: CreateNodeInput,
917                    },
918                    Edge {
919                        fields: Vec<(String, Value)>,
920                        input: CreateEdgeInput,
921                    },
922                }
923
924                let mut prepared = Vec::with_capacity(effective_rows.len());
925                for row_values in &effective_rows {
926                    if row_values.len() != query.columns.len() {
927                        return Err(RedDBError::Query(format!(
928                            "INSERT column count ({}) does not match value count ({})",
929                            query.columns.len(),
930                            row_values.len()
931                        )));
932                    }
933
934                    match query.entity_type {
935                        InsertEntityType::Node => {
936                            let (node_values, mut metadata) =
937                                split_insert_metadata(self, &query.columns, row_values)?;
938                            merge_with_clauses(
939                                &mut metadata,
940                                query.ttl_ms,
941                                query.expires_at_ms,
942                                &query.with_metadata,
943                            );
944                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
945                            apply_collection_default_ttl_metadata(
946                                self,
947                                &query.table,
948                                &mut metadata,
949                            );
950                            let (columns, values) = pairwise_columns_values(&node_values);
951                            let label = find_column_value_string(&columns, &values, "label")?;
952                            let node_type =
953                                find_column_value_opt_string(&columns, &values, "node_type");
954                            let properties = extract_remaining_properties(
955                                &columns,
956                                &values,
957                                &["label", "node_type"],
958                            );
959                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
960                                properties.iter().map(|(key, _)| key.as_str()),
961                                &format!("node '{}'", query.table),
962                            )?;
963                            prepared.push(PreparedGraphInsert::Node {
964                                fields: node_values,
965                                input: CreateNodeInput {
966                                    collection: query.table.clone(),
967                                    label,
968                                    node_type,
969                                    properties,
970                                    metadata,
971                                    embeddings: Vec::new(),
972                                    table_links: Vec::new(),
973                                    node_links: Vec::new(),
974                                },
975                            });
976                        }
977                        InsertEntityType::Edge => {
978                            let (edge_values, mut metadata) =
979                                split_insert_metadata(self, &query.columns, row_values)?;
980                            merge_with_clauses(
981                                &mut metadata,
982                                query.ttl_ms,
983                                query.expires_at_ms,
984                                &query.with_metadata,
985                            );
986                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
987                            apply_collection_default_ttl_metadata(
988                                self,
989                                &query.table,
990                                &mut metadata,
991                            );
992                            let (columns, values) = pairwise_columns_values(&edge_values);
993                            let label = find_column_value_string(&columns, &values, "label")?;
994                            ensure_non_tree_structural_edge_label(&label)?;
995                            let from_id = resolve_edge_endpoint_any(
996                                self.inner.db.store().as_ref(),
997                                &query.table,
998                                &columns,
999                                &values,
1000                                &["from_rid", "from"],
1001                            )?;
1002                            let to_id = resolve_edge_endpoint_any(
1003                                self.inner.db.store().as_ref(),
1004                                &query.table,
1005                                &columns,
1006                                &values,
1007                                &["to_rid", "to"],
1008                            )?;
1009                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1010                            let properties = extract_remaining_properties(
1011                                &columns,
1012                                &values,
1013                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1014                            );
1015                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1016                                properties.iter().map(|(key, _)| key.as_str()),
1017                                &format!("edge '{}'", query.table),
1018                            )?;
1019                            prepared.push(PreparedGraphInsert::Edge {
1020                                fields: edge_values,
1021                                input: CreateEdgeInput {
1022                                    collection: query.table.clone(),
1023                                    label,
1024                                    from: EntityId::new(from_id),
1025                                    to: EntityId::new(to_id),
1026                                    weight,
1027                                    properties,
1028                                    metadata,
1029                                },
1030                            });
1031                        }
1032                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1033                    }
1034                }
1035
1036                ensure_graph_insert_contract(self, &query.table)?;
1037                let mut batch = self.inner.db.batch();
1038                for item in prepared {
1039                    match item {
1040                        PreparedGraphInsert::Node { fields, input } => {
1041                            if query.returning.is_some() {
1042                                returning_field_snaps.push(fields);
1043                            }
1044                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1045                            batch = batch.add_node_with_type(
1046                                input.collection,
1047                                input.label,
1048                                node_type,
1049                                input.properties.into_iter().collect(),
1050                                input.metadata.into_iter().collect(),
1051                            );
1052                        }
1053                        PreparedGraphInsert::Edge { fields, input } => {
1054                            if query.returning.is_some() {
1055                                returning_field_snaps.push(fields);
1056                            }
1057                            batch = batch.add_edge(
1058                                input.collection,
1059                                input.label,
1060                                input.from,
1061                                input.to,
1062                                input.weight.unwrap_or(1.0),
1063                                input.properties.into_iter().collect(),
1064                                input.metadata.into_iter().collect(),
1065                            );
1066                        }
1067                    }
1068                }
1069                let batch_result = batch
1070                    .execute()
1071                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1072                let (ids, entity_kind) = match query.entity_type {
1073                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1074                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1075                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1076                };
1077                for id in &ids {
1078                    self.stamp_xmin_if_in_txn(&query.table, *id);
1079                }
1080                if query.returning.is_some() {
1081                    returning_field_snaps = graph_insert_returning_snapshots(
1082                        self.inner.db.store().as_ref(),
1083                        &query.table,
1084                        &ids,
1085                    );
1086                }
1087                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1088                let store = self.inner.db.store();
1089                entity_outputs.extend(ids.iter().map(|id| {
1090                    crate::application::entity::CreateEntityOutput {
1091                        id: *id,
1092                        entity: store.get(&query.table, *id),
1093                    }
1094                }));
1095                inserted_count = ids.len() as u64;
1096            } else {
1097                for row_values in &effective_rows {
1098                    if row_values.len() != query.columns.len() {
1099                        return Err(RedDBError::Query(format!(
1100                            "INSERT column count ({}) does not match value count ({})",
1101                            query.columns.len(),
1102                            row_values.len()
1103                        )));
1104                    }
1105
1106                    match query.entity_type {
1107                        InsertEntityType::Row => {
1108                            if query.returning.is_some() {
1109                                return Err(RedDBError::Query(
1110                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1111                                    .to_string(),
1112                            ));
1113                            }
1114                            let (fields, mut metadata) =
1115                                split_insert_metadata(self, &query.columns, row_values)?;
1116                            merge_with_clauses(
1117                                &mut metadata,
1118                                query.ttl_ms,
1119                                query.expires_at_ms,
1120                                &query.with_metadata,
1121                            );
1122                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1123                        }
1124                        InsertEntityType::Node | InsertEntityType::Edge => {
1125                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1126                        }
1127                        InsertEntityType::Vector => {
1128                            let (vector_values, mut metadata) =
1129                                split_insert_metadata(self, &query.columns, row_values)?;
1130                            merge_with_clauses(
1131                                &mut metadata,
1132                                query.ttl_ms,
1133                                query.expires_at_ms,
1134                                &query.with_metadata,
1135                            );
1136                            let (columns, values) = pairwise_columns_values(&vector_values);
1137                            let dense = find_column_value_vec_f32_any(
1138                                &columns,
1139                                &values,
1140                                &["dense", "embedding"],
1141                            )?;
1142                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1143                            let content =
1144                                find_column_value_opt_string(&columns, &values, "content");
1145                            if query.returning.is_some() {
1146                                returning_field_snaps.push(vector_values.clone());
1147                            }
1148                            let input = CreateVectorInput {
1149                                collection: query.table.clone(),
1150                                dense,
1151                                content,
1152                                metadata,
1153                                link_row: None,
1154                                link_node: None,
1155                            };
1156                            entity_outputs.push(self.create_vector(input)?);
1157                        }
1158                        InsertEntityType::Document => {
1159                            let (document_values, mut metadata) =
1160                                split_insert_metadata(self, &query.columns, row_values)?;
1161                            merge_with_clauses(
1162                                &mut metadata,
1163                                query.ttl_ms,
1164                                query.expires_at_ms,
1165                                &query.with_metadata,
1166                            );
1167                            let (columns, values) = pairwise_columns_values(&document_values);
1168                            let body_str = find_column_value_string(&columns, &values, "body")?;
1169                            let body: crate::json::Value = crate::json::from_str(&body_str)
1170                                .map_err(|e| {
1171                                    RedDBError::Query(format!("invalid JSON body: {e}"))
1172                                })?;
1173                            let input = CreateDocumentInput {
1174                                collection: query.table.clone(),
1175                                body,
1176                                metadata,
1177                                node_links: Vec::new(),
1178                                vector_links: Vec::new(),
1179                            };
1180                            let output = self.create_document(input)?;
1181                            if query.returning.is_some() {
1182                                let fields = output
1183                                    .entity
1184                                    .as_ref()
1185                                    .map(entity_row_fields_snapshot)
1186                                    .filter(|fields| !fields.is_empty())
1187                                    .unwrap_or(document_values);
1188                                returning_field_snaps.push(fields);
1189                            }
1190                            entity_outputs.push(output);
1191                        }
1192                        InsertEntityType::Kv => {
1193                            let (kv_values, mut metadata) =
1194                                split_insert_metadata(self, &query.columns, row_values)?;
1195                            merge_with_clauses(
1196                                &mut metadata,
1197                                query.ttl_ms,
1198                                query.expires_at_ms,
1199                                &query.with_metadata,
1200                            );
1201                            let (columns, values) = pairwise_columns_values(&kv_values);
1202                            let key = find_column_value_string(&columns, &values, "key")?;
1203                            let value = find_column_value(&columns, &values, "value")?;
1204                            if query.returning.is_some() {
1205                                returning_field_snaps.push(kv_values.clone());
1206                            }
1207                            let input = CreateKvInput {
1208                                collection: query.table.clone(),
1209                                key,
1210                                value,
1211                                metadata,
1212                            };
1213                            entity_outputs.push(self.create_kv(input)?);
1214                        }
1215                    }
1216
1217                    inserted_count += 1;
1218                }
1219            }
1220
1221            if let Some(items) = query.returning.as_ref() {
1222                if !entity_outputs.is_empty() {
1223                    returning_result = Some(build_returning_result(
1224                        items,
1225                        &returning_field_snaps,
1226                        Some(&entity_outputs),
1227                    ));
1228                }
1229            }
1230        }
1231
1232        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1233        if let Some(ref embed_config) = query.auto_embed {
1234            let store = self.inner.db.store();
1235            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1236            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
1237            let model = embed_config.model.clone().unwrap_or_else(|| {
1238                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1239                    .ok()
1240                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1241            });
1242
1243            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1244            let manager = store
1245                .get_collection(&query.table)
1246                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1247            let entities = manager.query_all(|_| true);
1248            let recent: Vec<_> = entities
1249                .into_iter()
1250                .rev()
1251                .take(effective_rows.len())
1252                .collect();
1253
1254            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1255            let entity_combos: Vec<(usize, String)> = recent
1256                .iter()
1257                .enumerate()
1258                .filter_map(|(i, entity)| {
1259                    if let EntityData::Row(ref row) = entity.data {
1260                        if let Some(ref named) = row.named {
1261                            let texts: Vec<String> = embed_config
1262                                .fields
1263                                .iter()
1264                                .filter_map(|field| match named.get(field) {
1265                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1266                                    _ => None,
1267                                })
1268                                .collect();
1269                            if !texts.is_empty() {
1270                                return Some((i, texts.join(" ")));
1271                            }
1272                        }
1273                    }
1274                    None
1275                })
1276                .collect();
1277
1278            if !entity_combos.is_empty() {
1279                // Batch phase: single provider round-trip for all rows.
1280                let batch_texts: Vec<String> =
1281                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1282
1283                let batch_client =
1284                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1285
1286                let embeddings = match tokio::runtime::Handle::try_current() {
1287                    Ok(handle) => tokio::task::block_in_place(|| {
1288                        handle.block_on(batch_client.embed_batch(
1289                            &provider,
1290                            &model,
1291                            &api_key,
1292                            batch_texts,
1293                        ))
1294                    }),
1295                    Err(_) => {
1296                        return Err(RedDBError::Query(
1297                            "AUTO EMBED requires a Tokio runtime context".to_string(),
1298                        ));
1299                    }
1300                }
1301                .map_err(|e| RedDBError::Query(e.to_string()))?;
1302
1303                // Distribute phase: persist one vector per non-empty embedding.
1304                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1305                    if dense.is_empty() {
1306                        continue;
1307                    }
1308                    self.create_vector(CreateVectorInput {
1309                        collection: query.table.clone(),
1310                        dense,
1311                        content: Some(combined.clone()),
1312                        metadata: Vec::new(),
1313                        link_row: None,
1314                        link_node: None,
1315                    })?;
1316                }
1317            }
1318        }
1319
1320        if inserted_count > 0 {
1321            self.note_table_write(&query.table);
1322        }
1323
1324        let mut result = RuntimeQueryResult::dml_result(
1325            raw_query.to_string(),
1326            inserted_count,
1327            "insert",
1328            "runtime-dml",
1329        );
1330        if let Some(returning) = returning_result {
1331            result.result = returning;
1332        }
1333        Ok(result)
1334    }
1335
1336    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1337        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1338            return Ok(());
1339        };
1340        if !auth_store.iam_authorization_enabled() {
1341            return Ok(());
1342        }
1343        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1344            return Ok(());
1345        };
1346
1347        let tenant = crate::runtime::impl_core::current_tenant();
1348        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1349        let request = crate::auth::ColumnAccessRequest {
1350            action: "insert".to_string(),
1351            schema: None,
1352            table: query.table.clone(),
1353            columns: query.columns.clone(),
1354        };
1355        let ctx = crate::auth::policies::EvalContext {
1356            principal_tenant: tenant.clone(),
1357            current_tenant: tenant,
1358            peer_ip: None,
1359            mfa_present: false,
1360            now_ms: crate::auth::now_ms(),
1361            principal_is_admin_role: role == crate::auth::Role::Admin,
1362            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1363            principal_is_platform_scoped: principal.tenant.is_none(),
1364        };
1365
1366        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1367        let table_allowed = matches!(
1368            outcome.table_decision,
1369            crate::auth::policies::Decision::Allow { .. }
1370                | crate::auth::policies::Decision::AdminBypass
1371        );
1372        if !table_allowed {
1373            return Err(RedDBError::Query(format!(
1374                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1375                outcome.table_resource.kind, outcome.table_resource.name
1376            )));
1377        }
1378        if let Some(denied) = outcome.first_denied_column() {
1379            return Err(RedDBError::Query(format!(
1380                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1381                denied.resource.kind, denied.resource.name
1382            )));
1383        }
1384
1385        Ok(())
1386    }
1387
1388    pub(crate) fn insert_timeseries_point(
1389        &self,
1390        collection: &str,
1391        fields: Vec<(String, Value)>,
1392        mut metadata: Vec<(String, MetadataValue)>,
1393    ) -> RedDBResult<EntityId> {
1394        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1395
1396        let (columns, values) = pairwise_columns_values(&fields);
1397        validate_timeseries_insert_columns(&columns)?;
1398
1399        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1400        // an `event_name` whose schema is registered, validate the
1401        // `payload` JSON against it BEFORE any write side-effect. On
1402        // failure we return a typed error and the row is not
1403        // persisted. When no schema is registered for the event name
1404        // (or no `event_name` column is supplied at all) we fall
1405        // through to the normal write path for back-compat with
1406        // existing timeseries rows.
1407        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1408        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1409        if let Some(event_name) = event_name_opt.as_deref() {
1410            let store_for_schema = self.inner.db.store();
1411            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1412                .is_some()
1413            {
1414                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1415                super::analytics_schema_registry::validate(
1416                    store_for_schema.as_ref(),
1417                    event_name,
1418                    payload_json,
1419                )
1420                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1421            }
1422        }
1423
1424        // `metric` is required by the existing timeseries write path;
1425        // when an analytics-style row supplies `event_name` but not
1426        // `metric`, fall back to the event name so the storage path
1427        // still has a non-empty metric tag.
1428        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1429            Some(m) => m,
1430            None => event_name_opt.clone().ok_or_else(|| {
1431                RedDBError::Query(
1432                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1433                )
1434            })?,
1435        };
1436        // `value` is optional for analytics-event rows (which are
1437        // semantically counts of 1); default to 1.0 when missing so
1438        // analytics inserts don't have to fabricate a metric value.
1439        let value = match find_column_value_opt_string(&columns, &values, "value") {
1440            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1441            None => columns
1442                .iter()
1443                .position(|c| c.eq_ignore_ascii_case("value"))
1444                .and_then(|i| match &values[i] {
1445                    Value::Float(f) => Some(*f),
1446                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1447                    Value::UnsignedInteger(n) => Some(*n as f64),
1448                    _ => None,
1449                })
1450                .unwrap_or(1.0),
1451        };
1452        let timestamp_ns =
1453            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1454        let mut tags = find_timeseries_tags(&columns, &values)?;
1455        if let Some(ref name) = event_name_opt {
1456            tags.entry("event_name".to_string())
1457                .or_insert_with(|| name.clone());
1458        }
1459        if let Some(ref payload) = payload_opt {
1460            tags.entry("payload".to_string())
1461                .or_insert_with(|| payload.clone());
1462        }
1463
1464        let mut entity = UnifiedEntity::new(
1465            EntityId::new(0),
1466            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1467                series: collection.to_string(),
1468                metric: metric.clone(),
1469            })),
1470            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1471                metric,
1472                timestamp_ns,
1473                value,
1474                tags,
1475            }),
1476        );
1477        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1478        // or an autocommit xid (allocated and committed up-front so
1479        // future snapshots see the row as soon as it lands).
1480        let writer_xid = match self.current_xid() {
1481            Some(xid) => xid,
1482            None => {
1483                let mgr = self.snapshot_manager();
1484                let xid = mgr.begin();
1485                mgr.commit(xid);
1486                xid
1487            }
1488        };
1489        entity.set_xmin(writer_xid);
1490
1491        let store = self.inner.db.store();
1492        let id = store
1493            .insert_auto(collection, entity)
1494            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1495
1496        if !metadata.is_empty() {
1497            let _ = store.set_metadata(
1498                collection,
1499                id,
1500                Metadata::with_fields(metadata.into_iter().collect()),
1501            );
1502        }
1503
1504        self.cdc_emit(
1505            crate::replication::cdc::ChangeOperation::Insert,
1506            collection,
1507            id.raw(),
1508            "timeseries",
1509        );
1510
1511        Ok(id)
1512    }
1513
1514    /// Execute UPDATE table SET col=val, ... WHERE filter
1515    ///
1516    /// Scans the target collection, evaluates the WHERE filter against each
1517    /// record, and patches every matching entity.
1518    pub fn execute_update(
1519        &self,
1520        raw_query: &str,
1521        query: &UpdateQuery,
1522    ) -> RedDBResult<RuntimeQueryResult> {
1523        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1524        // Issue #523 — blockchain collections are immutable. Reject before
1525        // RLS / RETURNING work so the operator sees a clean 409-mapped
1526        // error instead of a partially-applied mutation surface.
1527        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1528            return Err(RedDBError::InvalidOperation(format!(
1529                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1530                query.table
1531            )));
1532        }
1533        // CollectionContract gate (#50): runs the APPEND ONLY guard
1534        // (and any future contract bits) before RLS / RETURNING work
1535        // so the operator's immutability declaration is honoured
1536        // uniformly and the error message points at the DDL rather
1537        // than at a downstream symptom.
1538        crate::runtime::collection_contract::CollectionContractGate::check(
1539            self,
1540            &query.table,
1541            crate::runtime::collection_contract::MutationKind::Update,
1542        )?;
1543        ensure_update_target_contract(self, &query.table, query.target)?;
1544        ensure_graph_identity_update_target_allowed(query)?;
1545
1546        // Apply RLS augmentation first so every downstream path — plain
1547        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1548        // same policy-filtered target set. This prevents RETURNING
1549        // from ever exposing rows the UPDATE policy would have
1550        // denied.
1551        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1552        let augmented_query: UpdateQuery;
1553        let effective_query: &UpdateQuery = if rls_gated {
1554            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1555                self,
1556                &query.table,
1557                crate::storage::query::ast::PolicyAction::Update,
1558            );
1559            let Some(policy) = rls_filter else {
1560                // No admitting policy: zero rows affected, empty
1561                // RETURNING (never leak rows the caller can't touch).
1562                let mut response = RuntimeQueryResult::dml_result(
1563                    raw_query.to_string(),
1564                    0,
1565                    "update",
1566                    "runtime-dml-rls",
1567                );
1568                if let Some(items) = query.returning.clone() {
1569                    response.result = build_returning_result(&items, &[], None);
1570                }
1571                return Ok(response);
1572            };
1573            let mut augmented = query.clone();
1574            augmented.filter = Some(match augmented.filter.take() {
1575                Some(existing) => {
1576                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1577                }
1578                None => policy,
1579            });
1580            augmented_query = augmented;
1581            &augmented_query
1582        } else {
1583            query
1584        };
1585
1586        // RETURNING wraps the inner executor and uses the touched-id
1587        // list the inner reports so the post-image reflects exactly
1588        // the rows the UPDATE actually mutated (not whatever a
1589        // separate SELECT might have observed).
1590        if let Some(items) = effective_query.returning.clone() {
1591            let mut inner_query = effective_query.clone();
1592            inner_query.returning = None;
1593            let (mut response, touched_ids) =
1594                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1595
1596            let snapshots = if matches!(
1597                effective_query.target,
1598                UpdateTarget::Nodes | UpdateTarget::Edges
1599            ) {
1600                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1601            } else {
1602                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1603                    .row_snapshots(&touched_ids)
1604            };
1605
1606            response.result = build_returning_result(&items, &snapshots, None);
1607            response.engine = "runtime-dml-returning";
1608            return Ok(response);
1609        }
1610
1611        self.execute_update_inner(raw_query, effective_query)
1612    }
1613
1614    /// Back-compat shim: the older entry point ignored touched ids.
1615    fn execute_update_inner(
1616        &self,
1617        raw_query: &str,
1618        query: &UpdateQuery,
1619    ) -> RedDBResult<RuntimeQueryResult> {
1620        self.execute_update_inner_tracked(raw_query, query)
1621            .map(|(res, _)| res)
1622    }
1623
1624    fn execute_update_inner_tracked(
1625        &self,
1626        raw_query: &str,
1627        query: &UpdateQuery,
1628    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1629        let store = self.inner.db.store();
1630        let effective_filter = effective_update_filter(query);
1631        let compiled_plan = self.compile_update_plan(query)?;
1632        let mut touched_ids: Vec<EntityId> = Vec::new();
1633        let limit_cap = query.limit.map(|l| l as usize);
1634        let manager = store
1635            .get_collection(&query.table)
1636            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1637        let scan_limit = if query.order_by.is_empty() {
1638            limit_cap
1639        } else {
1640            None
1641        };
1642        let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1643            self,
1644            &query.table,
1645            effective_filter.as_ref(),
1646            scan_limit,
1647            query.target,
1648        )
1649        .find_target_ids()?;
1650        let ids_to_update = if query.order_by.is_empty() {
1651            ids_to_update
1652        } else {
1653            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1654        };
1655
1656        if update_needs_rmw_lock(query) {
1657            return self.execute_update_inner_tracked_locked(
1658                raw_query,
1659                query,
1660                &compiled_plan,
1661                &ids_to_update,
1662                effective_filter.as_ref(),
1663            );
1664        }
1665
1666        let mut affected: u64 = 0;
1667        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1668            let mut applied_chunk = Vec::with_capacity(chunk.len());
1669            for entity in manager.get_many(chunk).into_iter().flatten() {
1670                let assignments =
1671                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1672                let applied = self.apply_materialized_update_for_entity(
1673                    query.table.clone(),
1674                    entity,
1675                    &compiled_plan,
1676                    assignments,
1677                )?;
1678                touched_ids.push(applied.id);
1679                applied_chunk.push(applied);
1680            }
1681            self.persist_update_chunk(&applied_chunk)?;
1682            affected += applied_chunk.len() as u64;
1683            let lsns = self.flush_update_chunk(&applied_chunk)?;
1684            if !query.suppress_events {
1685                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1686            }
1687        }
1688
1689        if affected > 0 {
1690            self.note_table_write(&query.table);
1691        }
1692
1693        Ok((
1694            RuntimeQueryResult::dml_result(
1695                raw_query.to_string(),
1696                affected,
1697                "update",
1698                "runtime-dml",
1699            ),
1700            touched_ids,
1701        ))
1702    }
1703
1704    fn execute_update_inner_tracked_locked(
1705        &self,
1706        raw_query: &str,
1707        query: &UpdateQuery,
1708        compiled_plan: &CompiledUpdatePlan,
1709        ids_to_update: &[EntityId],
1710        effective_filter: Option<&Filter>,
1711    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1712        let store = self.inner.db.store();
1713        let mut touched_ids = Vec::new();
1714        let mut lock_entries = Vec::new();
1715
1716        for id in ids_to_update {
1717            let Some(candidate) = store.get(&query.table, *id) else {
1718                continue;
1719            };
1720            let logical_id = candidate.logical_id();
1721            let lock_key = format!("row:{}", logical_id.raw());
1722            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1723            lock_entries.push((lock_key, logical_id, rmw_lock));
1724        }
1725
1726        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1727        lock_entries.dedup_by(|left, right| left.0 == right.0);
1728        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1729
1730        let mut applied_chunk = Vec::new();
1731        for (_, logical_id, _) in &lock_entries {
1732            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1733            else {
1734                continue;
1735            };
1736            if let Some(filter) = effective_filter {
1737                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1738                    Some(self.inner.db.as_ref()),
1739                    &entity,
1740                    filter,
1741                    &query.table,
1742                    &query.table,
1743                ) {
1744                    continue;
1745                }
1746            }
1747
1748            let assignments =
1749                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1750            let applied = self.apply_materialized_update_for_entity(
1751                query.table.clone(),
1752                entity,
1753                compiled_plan,
1754                assignments,
1755            )?;
1756            touched_ids.push(applied.id);
1757            applied_chunk.push(applied);
1758        }
1759
1760        let affected = applied_chunk.len() as u64;
1761        if !applied_chunk.is_empty() {
1762            self.persist_update_chunk(&applied_chunk)?;
1763            let lsns = self.flush_update_chunk(&applied_chunk)?;
1764            if !query.suppress_events {
1765                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1766            }
1767        }
1768
1769        if affected > 0 {
1770            self.note_table_write(&query.table);
1771        }
1772
1773        Ok((
1774            RuntimeQueryResult::dml_result(
1775                raw_query.to_string(),
1776                affected,
1777                "update",
1778                "runtime-dml",
1779            ),
1780            touched_ids,
1781        ))
1782    }
1783
1784    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1785        let mut static_field_assignments = Vec::new();
1786        let mut static_metadata_assignments = Vec::new();
1787        let mut dynamic_assignments = Vec::new();
1788        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1789        let mut row_modified_columns = Vec::new();
1790
1791        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1792            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1793            let metadata_key = resolve_sql_ttl_metadata_key(column);
1794            if compound_op.is_some() && metadata_key.is_some() {
1795                return Err(RedDBError::Query(format!(
1796                    "compound assignment is only supported for row fields: {column}"
1797                )));
1798            }
1799            if compound_op.is_none() {
1800                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1801                    if let Some(metadata_key) = metadata_key {
1802                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1803                        let (canonical_key, canonical_value) =
1804                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1805                        static_metadata_assignments
1806                            .push((canonical_key.to_string(), canonical_value));
1807                    } else {
1808                        let value = self.resolve_crypto_sentinel(value)?;
1809                        static_field_assignments.push((
1810                            column.clone(),
1811                            normalize_row_update_assignment_with_plan(
1812                                &query.table,
1813                                column,
1814                                value,
1815                                row_contract_plan.as_ref(),
1816                            )?,
1817                        ));
1818                        row_modified_columns.push(column.clone());
1819                    }
1820                    continue;
1821                }
1822            }
1823
1824            dynamic_assignments.push(CompiledUpdateAssignment {
1825                column: column.clone(),
1826                expr: expr.clone(),
1827                compound_op,
1828                metadata_key,
1829                row_rule: if metadata_key.is_none() {
1830                    if let Some(plan) = row_contract_plan.as_ref() {
1831                        if plan.timestamps_enabled
1832                            && (column == "created_at" || column == "updated_at")
1833                        {
1834                            return Err(RedDBError::Query(format!(
1835                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1836                                query.table, column
1837                            )));
1838                        }
1839                        if let Some(rule) = plan.declared_rules.get(column) {
1840                            Some(rule.clone())
1841                        } else if plan.strict_schema {
1842                            return Err(RedDBError::Query(format!(
1843                                "collection '{}' is strict and does not allow undeclared fields: {}",
1844                                query.table, column
1845                            )));
1846                        } else {
1847                            None
1848                        }
1849                    } else {
1850                        None
1851                    }
1852                } else {
1853                    None
1854                },
1855            });
1856            if metadata_key.is_none() {
1857                row_modified_columns.push(column.clone());
1858            }
1859        }
1860
1861        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1862        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1863            row_modified_columns.iter().any(|column| {
1864                plan.unique_columns
1865                    .keys()
1866                    .any(|unique| unique.eq_ignore_ascii_case(column))
1867            })
1868        });
1869
1870        if let Some(ttl_ms) = query.ttl_ms {
1871            static_metadata_assignments
1872                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1873        }
1874        if let Some(expires_at_ms) = query.expires_at_ms {
1875            static_metadata_assignments.push((
1876                "_expires_at".to_string(),
1877                metadata_u64_to_value(expires_at_ms),
1878            ));
1879        }
1880        for (key, val) in &query.with_metadata {
1881            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1882        }
1883
1884        Ok(CompiledUpdatePlan {
1885            static_field_assignments,
1886            static_metadata_assignments,
1887            dynamic_assignments,
1888            row_contract_plan,
1889            row_modified_columns,
1890            row_touches_unique_columns,
1891        })
1892    }
1893
1894    fn materialize_update_assignments_for_entity(
1895        &self,
1896        query: &UpdateQuery,
1897        entity: &UnifiedEntity,
1898        compiled_plan: &CompiledUpdatePlan,
1899    ) -> RedDBResult<MaterializedUpdateAssignments> {
1900        let mut assignments = MaterializedUpdateAssignments::default();
1901        let mut record: Option<UnifiedRecord> = None;
1902
1903        for assignment in &compiled_plan.dynamic_assignments {
1904            if assignment.compound_op.is_some()
1905                && !matches!(
1906                    entity.data,
1907                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1908                )
1909            {
1910                return Err(RedDBError::Query(format!(
1911                    "compound assignment is only supported for row or graph UPDATE column '{}'",
1912                    assignment.column
1913                )));
1914            }
1915            if record.is_none() {
1916                record = runtime_any_record_from_entity_ref(entity);
1917            }
1918            let Some(record) = record.as_ref() else {
1919                return Err(RedDBError::Query(format!(
1920                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1921                    entity.id.raw(),
1922                    query.table
1923                )));
1924            };
1925            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1926                Some(self.inner.db.as_ref()),
1927                &assignment.expr,
1928                record,
1929                Some(query.table.as_str()),
1930                Some(query.table.as_str()),
1931            )
1932            .ok_or_else(|| {
1933                RedDBError::Query(format!(
1934                    "failed to evaluate UPDATE expression for column '{}'",
1935                    assignment.column
1936                ))
1937            })?;
1938            let value = if let Some(op) = assignment.compound_op {
1939                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1940            } else {
1941                rhs
1942            };
1943
1944            if let Some(metadata_key) = assignment.metadata_key {
1945                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1946                let (canonical_key, canonical_value) =
1947                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1948                assignments
1949                    .dynamic_metadata_assignments
1950                    .push((canonical_key.to_string(), canonical_value));
1951            } else {
1952                assignments.dynamic_field_assignments.push((
1953                    assignment.column.clone(),
1954                    normalize_row_update_value_for_rule(
1955                        &query.table,
1956                        self.resolve_crypto_sentinel(value)?,
1957                        assignment.row_rule.as_ref(),
1958                    )?,
1959                ));
1960            }
1961        }
1962
1963        Ok(assignments)
1964    }
1965
1966    fn apply_materialized_update_for_entity(
1967        &self,
1968        collection: String,
1969        entity: UnifiedEntity,
1970        compiled_plan: &CompiledUpdatePlan,
1971        assignments: MaterializedUpdateAssignments,
1972    ) -> RedDBResult<AppliedEntityMutation> {
1973        if matches!(entity.data, EntityData::Row(_)) {
1974            return self.apply_loaded_sql_update_row_core(
1975                collection,
1976                entity,
1977                &compiled_plan.static_field_assignments,
1978                assignments.dynamic_field_assignments,
1979                &compiled_plan.static_metadata_assignments,
1980                assignments.dynamic_metadata_assignments,
1981                compiled_plan.row_contract_plan.as_ref(),
1982                &compiled_plan.row_modified_columns,
1983                compiled_plan.row_touches_unique_columns,
1984            );
1985        }
1986
1987        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
1988
1989        let operations = build_patch_operations_from_materialized_assignments(
1990            &entity,
1991            compiled_plan,
1992            assignments,
1993        );
1994        self.apply_loaded_patch_entity_core(
1995            collection,
1996            entity,
1997            crate::json::Value::Null,
1998            operations,
1999        )
2000    }
2001
2002    /// Execute DELETE FROM table WHERE filter
2003    pub fn execute_delete(
2004        &self,
2005        raw_query: &str,
2006        query: &DeleteQuery,
2007    ) -> RedDBResult<RuntimeQueryResult> {
2008        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2009        // Issue #523 — blockchain collections are immutable; see
2010        // execute_update for the same gate.
2011        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2012            return Err(RedDBError::InvalidOperation(format!(
2013                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2014                query.table
2015            )));
2016        }
2017        // CollectionContract gate (#50) — see execute_update for
2018        // rationale. The gate handles APPEND ONLY rejection and is
2019        // the single point where future contract bits land.
2020        crate::runtime::collection_contract::CollectionContractGate::check(
2021            self,
2022            &query.table,
2023            crate::runtime::collection_contract::MutationKind::Delete,
2024        )?;
2025
2026        // RETURNING on DELETE: capture the pre-image via an internal
2027        // SELECT that reuses the same WHERE, then run the delete with
2028        // the RETURNING clause stripped, then project the captured
2029        // rows through the requested items. The extra SELECT is a
2030        // pragmatic MVP — a future pass can fuse the scan with the
2031        // delete to avoid the second pass over the heap.
2032        if let Some(items) = query.returning.clone() {
2033            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2034                RedDBError::Query(
2035                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2036                )
2037            })?;
2038            let captured = self.execute_query(&select_sql)?;
2039
2040            let mut inner_query = query.clone();
2041            inner_query.returning = None;
2042            let _ = self.execute_delete(raw_query, &inner_query)?;
2043
2044            let snapshots: Vec<Vec<(String, Value)>> = captured
2045                .result
2046                .records
2047                .iter()
2048                .map(|rec| {
2049                    rec.iter_fields()
2050                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2051                        .collect()
2052                })
2053                .collect();
2054            let affected = snapshots.len() as u64;
2055            let result = build_returning_result(&items, &snapshots, None);
2056
2057            let mut response = RuntimeQueryResult::dml_result(
2058                raw_query.to_string(),
2059                affected,
2060                "delete",
2061                "runtime-dml-returning",
2062            );
2063            response.result = result;
2064            return Ok(response);
2065        }
2066        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2067        //
2068        // When the table has RLS enabled, gate the DELETE by the
2069        // per-role policy set: mutations only touch rows that *every*
2070        // matching `FOR DELETE` policy would accept. No policies =>
2071        // zero rows affected (PG restrictive-default).
2072        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2073            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2074                self,
2075                &query.table,
2076                crate::storage::query::ast::PolicyAction::Delete,
2077            );
2078            let Some(policy) = rls_filter else {
2079                return Ok(RuntimeQueryResult::dml_result(
2080                    raw_query.to_string(),
2081                    0,
2082                    "delete",
2083                    "runtime-dml-rls",
2084                ));
2085            };
2086            // Fold the policy predicate into the user's WHERE before
2087            // dispatching — the remainder of this function reads the
2088            // filter from `query` via `effective_delete_filter`, which
2089            // respects the updated value.
2090            let mut augmented = query.clone();
2091            augmented.filter = Some(match augmented.filter.take() {
2092                Some(existing) => {
2093                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2094                }
2095                None => policy,
2096            });
2097            return self.execute_delete_inner(raw_query, &augmented);
2098        }
2099        self.execute_delete_inner(raw_query, query)
2100    }
2101
2102    fn execute_delete_inner(
2103        &self,
2104        raw_query: &str,
2105        query: &DeleteQuery,
2106    ) -> RedDBResult<RuntimeQueryResult> {
2107        let effective_filter = effective_delete_filter(query);
2108
2109        // Find the rows that match the WHERE clause. The "find target
2110        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2111        // the same scan strategy.
2112        let scan = super::dml_target_scan::DmlTargetScan::new(
2113            self,
2114            &query.table,
2115            effective_filter.as_ref(),
2116            None,
2117        );
2118        let ids_to_delete = scan.find_target_ids()?;
2119
2120        // For event-enabled collections, snapshot the pre-delete state
2121        // before rows are physically removed.
2122        let needs_delete_events =
2123            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2124        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2125            scan.row_json_pre_images(&ids_to_delete)
2126        } else {
2127            HashMap::new()
2128        };
2129
2130        let mut affected: u64 = 0;
2131        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2132            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2133            affected += count;
2134            if needs_delete_events && !lsns.is_empty() {
2135                // lsns.len() == actually-deleted entities; align with chunk ids.
2136                // `delete_batch` may skip missing entities, so we correlate by
2137                // the number returned (they're emitted in chunk order).
2138                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2139                self.emit_delete_events_for_collection(
2140                    &query.table,
2141                    deleted_chunk,
2142                    &lsns,
2143                    &pre_images,
2144                )?;
2145            }
2146        }
2147        pre_images.clear();
2148
2149        if affected > 0 {
2150            self.note_table_write(&query.table);
2151        }
2152
2153        Ok(RuntimeQueryResult::dml_result(
2154            raw_query.to_string(),
2155            affected,
2156            "delete",
2157            "runtime-dml",
2158        ))
2159    }
2160}
2161
2162/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2163/// columns regardless of whether any row matches the WHERE clause. The
2164/// per-entity guard below covers only the matched-rows case, but ADR 0019
2165/// declares these columns immutable on the surface itself, so a zero-row
2166/// UPDATE should still surface the same error to operators and SDKs.
2167fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2168    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2169        return Ok(());
2170    }
2171    for (column, _) in &query.assignment_exprs {
2172        if is_immutable_graph_identity_field(column) {
2173            return Err(RedDBError::Query(format!(
2174                "immutable graph field '{column}' cannot be updated"
2175            )));
2176        }
2177    }
2178    Ok(())
2179}
2180
2181fn ensure_graph_identity_update_allowed(
2182    entity: &UnifiedEntity,
2183    compiled_plan: &CompiledUpdatePlan,
2184    assignments: &MaterializedUpdateAssignments,
2185) -> RedDBResult<()> {
2186    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2187        return Ok(());
2188    }
2189
2190    for (column, _) in compiled_plan
2191        .static_field_assignments
2192        .iter()
2193        .chain(assignments.dynamic_field_assignments.iter())
2194    {
2195        if is_immutable_graph_identity_field(column) {
2196            return Err(RedDBError::Query(format!(
2197                "immutable graph field '{column}' cannot be updated"
2198            )));
2199        }
2200    }
2201
2202    Ok(())
2203}
2204
2205fn is_immutable_graph_identity_field(column: &str) -> bool {
2206    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2207        .iter()
2208        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2209}
2210
2211fn build_patch_operations_from_materialized_assignments(
2212    entity: &UnifiedEntity,
2213    compiled_plan: &CompiledUpdatePlan,
2214    assignments: MaterializedUpdateAssignments,
2215) -> Vec<PatchEntityOperation> {
2216    let mut operations = Vec::with_capacity(
2217        compiled_plan.static_field_assignments.len()
2218            + compiled_plan.static_metadata_assignments.len()
2219            + assignments.dynamic_field_assignments.len()
2220            + assignments.dynamic_metadata_assignments.len(),
2221    );
2222
2223    for (column, value) in &compiled_plan.static_field_assignments {
2224        operations.push(PatchEntityOperation {
2225            op: PatchEntityOperationType::Set,
2226            path: update_patch_path_for_entity(entity, column),
2227            value: Some(storage_value_to_json(value)),
2228        });
2229    }
2230
2231    for (column, value) in assignments.dynamic_field_assignments {
2232        operations.push(PatchEntityOperation {
2233            op: PatchEntityOperationType::Set,
2234            path: update_patch_path_for_entity(entity, &column),
2235            value: Some(storage_value_to_json(&value)),
2236        });
2237    }
2238
2239    for (key, value) in &compiled_plan.static_metadata_assignments {
2240        operations.push(PatchEntityOperation {
2241            op: PatchEntityOperationType::Set,
2242            path: vec!["metadata".to_string(), key.clone()],
2243            value: Some(metadata_value_to_json(value)),
2244        });
2245    }
2246
2247    for (key, value) in assignments.dynamic_metadata_assignments {
2248        operations.push(PatchEntityOperation {
2249            op: PatchEntityOperationType::Set,
2250            path: vec!["metadata".to_string(), key],
2251            value: Some(metadata_value_to_json(&value)),
2252        });
2253    }
2254
2255    operations
2256}
2257
2258fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2259    if matches!(
2260        (&entity.kind, &entity.data),
2261        (
2262            crate::storage::EntityKind::GraphNode(_),
2263            EntityData::Node(_)
2264        )
2265    ) && column.eq_ignore_ascii_case("node_type")
2266    {
2267        return vec!["node_type".to_string()];
2268    }
2269    if matches!(
2270        (&entity.kind, &entity.data),
2271        (
2272            crate::storage::EntityKind::GraphEdge(_),
2273            EntityData::Edge(_)
2274        )
2275    ) && column.eq_ignore_ascii_case("weight")
2276    {
2277        return vec!["weight".to_string()];
2278    }
2279    vec!["fields".to_string(), column.to_string()]
2280}
2281
2282/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2283/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2284/// capture the pre-image before actually removing the rows. Returns
2285/// `None` when the input does not start with `DELETE`.
2286///
2287/// Case-insensitive on the keywords. Preserves everything between
2288/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2289/// LIMIT survive untouched. The RETURNING tail — if present — is
2290/// truncated at the first top-level `RETURNING` token.
2291fn delete_to_select_sql(sql: &str) -> Option<String> {
2292    let trimmed = sql.trim_start();
2293    let lowered = trimmed.to_ascii_lowercase();
2294    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2295        return None;
2296    }
2297    // Find `FROM` after DELETE.
2298    let from_idx = lowered.find(" from ")?;
2299    let after_from = &trimmed[from_idx + " from ".len()..];
2300    let after_from_lc = &lowered[from_idx + " from ".len()..];
2301
2302    // Cut off the RETURNING tail (a naive search — the RETURNING
2303    // clause only appears once per statement at top level in our
2304    // grammar). Matches whitespace-bounded tokens to avoid clipping
2305    // `RETURNING` inside a string literal.
2306    let mut body = after_from.to_string();
2307    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2308        body.truncate(pos);
2309    }
2310    Some(format!("SELECT * FROM {}", body.trim_end()))
2311}
2312
2313/// Find the byte offset of a whitespace-bounded keyword in a
2314/// lowercased haystack, skipping matches inside single-quoted
2315/// string literals. Naive — no escape handling — but enough for
2316/// the shapes the DML parser emits.
2317fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2318    let bytes = haystack.as_bytes();
2319    let nlen = needle.len();
2320    let mut i = 0usize;
2321    let mut in_string = false;
2322    while i < bytes.len() {
2323        let c = bytes[i];
2324        if c == b'\'' {
2325            in_string = !in_string;
2326            i += 1;
2327            continue;
2328        }
2329        if !in_string
2330            && i + nlen <= bytes.len()
2331            && &bytes[i..i + nlen] == needle.as_bytes()
2332            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2333            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2334        {
2335            return Some(i);
2336        }
2337        i += 1;
2338    }
2339    None
2340}
2341
2342/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2343/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2344/// for one affected row; `outputs`, when provided, supplies the engine-
2345/// assigned entity id for the same row (INSERT path). Projection honours
2346/// the RETURNING items: `*` expands to every snapshot column plus
2347/// the public row envelope when available.
2348fn build_returning_result(
2349    items: &[ReturningItem],
2350    snapshots: &[Vec<(String, Value)>],
2351    outputs: Option<&[CreateEntityOutput]>,
2352) -> UnifiedResult {
2353    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2354    let public_item_outputs = outputs.is_some_and(|outs| {
2355        outs.first()
2356            .and_then(|out| out.entity.as_ref())
2357            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2358    });
2359
2360    let mut columns: Vec<String> = if project_all {
2361        let mut cols: Vec<String> = Vec::new();
2362        if public_item_outputs {
2363            cols.extend(
2364                [
2365                    "rid",
2366                    "collection",
2367                    "kind",
2368                    "tenant",
2369                    "created_at",
2370                    "updated_at",
2371                ]
2372                .into_iter()
2373                .map(str::to_string),
2374            );
2375        } else if outputs.is_some() {
2376            cols.push("red_entity_id".to_string());
2377        }
2378        if let Some(first) = snapshots.first() {
2379            for (name, _) in first {
2380                cols.push(name.clone());
2381            }
2382        }
2383        cols
2384    } else {
2385        items
2386            .iter()
2387            .filter_map(|it| match it {
2388                ReturningItem::Column(c) => Some(c.clone()),
2389                ReturningItem::All => None,
2390            })
2391            .collect()
2392    };
2393    // Guarantee unique order-preserving column list.
2394    {
2395        let mut seen = std::collections::HashSet::new();
2396        columns.retain(|c| seen.insert(c.clone()));
2397    }
2398
2399    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2400    for (idx, snap) in snapshots.iter().enumerate() {
2401        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2402        if let Some(outs) = outputs {
2403            if let Some(out) = outs.get(idx) {
2404                if let Some(entity) = out.entity.as_ref() {
2405                    if let Some(kind) = public_returning_item_kind(entity) {
2406                        values.insert(
2407                            Arc::clone(&sys_key_rid()),
2408                            Value::UnsignedInteger(out.id.raw()),
2409                        );
2410                        values.insert(
2411                            Arc::clone(&sys_key_collection()),
2412                            Value::text(entity.kind.collection().to_string()),
2413                        );
2414                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2415                        values.insert(
2416                            Arc::clone(&sys_key_created_at()),
2417                            Value::UnsignedInteger(entity.created_at),
2418                        );
2419                        values.insert(
2420                            Arc::clone(&sys_key_updated_at()),
2421                            Value::UnsignedInteger(entity.updated_at),
2422                        );
2423                        // Legacy alias: an explicit `RETURNING red_entity_id`
2424                        // still resolves to the row's rid. Only surfaces when
2425                        // the projected column list names it — `RETURNING *`
2426                        // keeps the envelope clean (rid, not red_entity_id).
2427                        values.insert(
2428                            Arc::clone(&sys_key_red_entity_id()),
2429                            Value::UnsignedInteger(out.id.raw()),
2430                        );
2431                    } else {
2432                        values.insert(
2433                            Arc::clone(&sys_key_red_entity_id()),
2434                            Value::Integer(out.id.raw() as i64),
2435                        );
2436                    }
2437                } else {
2438                    values.insert(
2439                        Arc::clone(&sys_key_red_entity_id()),
2440                        Value::Integer(out.id.raw() as i64),
2441                    );
2442                }
2443            }
2444        }
2445        for (name, val) in snap {
2446            values.insert(Arc::from(name.as_str()), val.clone());
2447        }
2448        if !values.contains_key("tenant") {
2449            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2450            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2451        }
2452        let mut rec = UnifiedRecord::default();
2453        // Only keep projected columns on the record.
2454        for col in &columns {
2455            if let Some(v) = values.get(col.as_str()) {
2456                rec.set_arc(Arc::from(col.as_str()), v.clone());
2457            }
2458        }
2459        records.push(rec);
2460    }
2461
2462    UnifiedResult {
2463        columns,
2464        records,
2465        stats: Default::default(),
2466        pre_serialized_json: None,
2467    }
2468}
2469
2470fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2471    match (&entity.kind, &entity.data) {
2472        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2473            Some("node")
2474        }
2475        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2476            Some("edge")
2477        }
2478        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2479        _ => None,
2480    }
2481}
2482
2483fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2484    let Some(row) = entity.data.as_row() else {
2485        return "row";
2486    };
2487
2488    let is_kv = row.named.as_ref().is_some_and(|named| {
2489        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2490            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2491    });
2492    if is_kv {
2493        return "kv";
2494    }
2495
2496    let is_document = row
2497        .named
2498        .as_ref()
2499        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2500        || row.columns.iter().any(runtime_returning_documentish_value);
2501    if is_document {
2502        "document"
2503    } else {
2504        "row"
2505    }
2506}
2507
2508fn runtime_returning_documentish_value(value: &Value) -> bool {
2509    matches!(value, Value::Json(_) | Value::Blob(_))
2510}
2511
2512fn row_insert_returning_snapshots(
2513    outputs: &[CreateEntityOutput],
2514    fallback: Vec<Vec<(String, Value)>>,
2515) -> Vec<Vec<(String, Value)>> {
2516    outputs
2517        .iter()
2518        .enumerate()
2519        .map(|(idx, out)| {
2520            out.entity
2521                .as_ref()
2522                .map(entity_row_fields_snapshot)
2523                .filter(|snap| !snap.is_empty())
2524                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2525        })
2526        .collect()
2527}
2528
2529fn graph_insert_returning_snapshots(
2530    store: &crate::storage::unified::UnifiedStore,
2531    collection: &str,
2532    ids: &[EntityId],
2533) -> Vec<Vec<(String, Value)>> {
2534    let Some(manager) = store.get_collection(collection) else {
2535        return Vec::new();
2536    };
2537
2538    ids.iter()
2539        .filter_map(|id| manager.get(*id))
2540        .filter_map(|entity| {
2541            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2542            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2543            Some(record)
2544        })
2545        .map(|record| {
2546            record
2547                .iter_fields()
2548                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2549                .collect()
2550        })
2551        .collect()
2552}
2553
2554fn graph_update_returning_snapshots(
2555    runtime: &RedDBRuntime,
2556    collection: &str,
2557    ids: &[EntityId],
2558) -> Vec<Vec<(String, Value)>> {
2559    let store = runtime.db().store();
2560    let Some(manager) = store.get_collection(collection) else {
2561        return Vec::new();
2562    };
2563
2564    manager
2565        .get_many(ids)
2566        .into_iter()
2567        .flatten()
2568        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2569        .map(|record| {
2570            record
2571                .iter_fields()
2572                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2573                .collect()
2574        })
2575        .collect()
2576}
2577
2578fn ensure_update_target_contract(
2579    runtime: &RedDBRuntime,
2580    collection: &str,
2581    target: UpdateTarget,
2582) -> RedDBResult<()> {
2583    let Some(contract) = runtime.db().collection_contract(collection) else {
2584        return Ok(());
2585    };
2586    if update_target_contract_is_advisory(&contract)
2587        || update_target_allows_model(contract.declared_model, update_target_model(target))
2588    {
2589        return Ok(());
2590    }
2591    Err(RedDBError::InvalidOperation(format!(
2592        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2593        collection,
2594        update_model_name(contract.declared_model),
2595        update_model_name(update_target_model(target))
2596    )))
2597}
2598
2599fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2600    matches!(
2601        (&contract.origin, &contract.schema_mode),
2602        (
2603            crate::physical::ContractOrigin::Implicit,
2604            crate::catalog::SchemaMode::Dynamic,
2605        )
2606    )
2607}
2608
2609fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2610    match target {
2611        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2612        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2613        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2614        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2615    }
2616}
2617
2618fn update_target_allows_model(
2619    declared_model: crate::catalog::CollectionModel,
2620    requested_model: crate::catalog::CollectionModel,
2621) -> bool {
2622    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2623}
2624
2625fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2626    match model {
2627        crate::catalog::CollectionModel::Table => "table",
2628        crate::catalog::CollectionModel::Document => "document",
2629        crate::catalog::CollectionModel::Graph => "graph",
2630        crate::catalog::CollectionModel::Vector => "vector",
2631        crate::catalog::CollectionModel::Hll => "hll",
2632        crate::catalog::CollectionModel::Sketch => "sketch",
2633        crate::catalog::CollectionModel::Filter => "filter",
2634        crate::catalog::CollectionModel::Kv => "kv",
2635        crate::catalog::CollectionModel::Config => "config",
2636        crate::catalog::CollectionModel::Vault => "vault",
2637        crate::catalog::CollectionModel::Mixed => "mixed",
2638        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2639        crate::catalog::CollectionModel::Queue => "queue",
2640        crate::catalog::CollectionModel::Metrics => "metrics",
2641    }
2642}
2643
2644fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2645    let db = runtime.db();
2646    if let Some(contract) = db.collection_contract(collection) {
2647        let advisory_implicit_dynamic = matches!(
2648            (&contract.origin, &contract.schema_mode),
2649            (
2650                crate::physical::ContractOrigin::Implicit,
2651                crate::catalog::SchemaMode::Dynamic,
2652            )
2653        );
2654        if advisory_implicit_dynamic
2655            || matches!(
2656                contract.declared_model,
2657                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2658            )
2659        {
2660            return Ok(());
2661        }
2662        return Err(RedDBError::InvalidOperation(format!(
2663            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2664            collection, contract.declared_model
2665        )));
2666    }
2667
2668    let now = std::time::SystemTime::now()
2669        .duration_since(std::time::UNIX_EPOCH)
2670        .unwrap_or_default()
2671        .as_millis();
2672    db.save_collection_contract(crate::physical::CollectionContract {
2673        name: collection.to_string(),
2674        declared_model: crate::catalog::CollectionModel::Graph,
2675        schema_mode: crate::catalog::SchemaMode::Dynamic,
2676        origin: crate::physical::ContractOrigin::Implicit,
2677        version: 1,
2678        created_at_unix_ms: now,
2679        updated_at_unix_ms: now,
2680        default_ttl_ms: db.collection_default_ttl_ms(collection),
2681        vector_dimension: None,
2682        vector_metric: None,
2683        context_index_fields: Vec::new(),
2684        declared_columns: Vec::new(),
2685        table_def: None,
2686        timestamps_enabled: false,
2687        context_index_enabled: false,
2688        metrics_raw_retention_ms: None,
2689        metrics_rollup_policies: Vec::new(),
2690        metrics_tenant_identity: None,
2691        metrics_namespace: None,
2692        append_only: false,
2693        subscriptions: Vec::new(),
2694        session_key: None,
2695        session_gap_ms: None,
2696        retention_duration_ms: None,
2697    })
2698    .map(|_| ())
2699    .map_err(|err| RedDBError::Internal(err.to_string()))
2700}
2701
2702fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2703    query
2704        .assignment_exprs
2705        .iter()
2706        .enumerate()
2707        .any(|(idx, (column, expr))| {
2708            query
2709                .compound_assignment_ops
2710                .get(idx)
2711                .is_some_and(|op| op.is_some())
2712                || expr_references_update_column(expr, &query.table, column)
2713        })
2714}
2715
2716fn evaluate_compound_update_assignment(
2717    column: &str,
2718    record: &UnifiedRecord,
2719    op: BinOp,
2720    rhs: Value,
2721) -> RedDBResult<Value> {
2722    let lhs = record.get(column).ok_or_else(|| {
2723        RedDBError::Query(format!(
2724            "compound assignment requires existing numeric field '{column}'"
2725        ))
2726    })?;
2727    if matches!(lhs, Value::Null) {
2728        return Err(RedDBError::Query(format!(
2729            "compound assignment requires non-null numeric field '{column}'"
2730        )));
2731    }
2732    apply_compound_numeric_op(column, op, lhs, &rhs)
2733}
2734
2735fn apply_compound_numeric_op(
2736    column: &str,
2737    op: BinOp,
2738    lhs: &Value,
2739    rhs: &Value,
2740) -> RedDBResult<Value> {
2741    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2742        return Err(RedDBError::Query(format!(
2743            "compound assignment requires numeric field '{column}'"
2744        )));
2745    };
2746    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2747        return Err(RedDBError::Query(format!(
2748            "compound assignment requires numeric right-hand value for field '{column}'"
2749        )));
2750    };
2751
2752    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2753        let a = lhs_number.as_f64();
2754        let b = rhs_number.as_f64();
2755        let out = match op {
2756            BinOp::Add => a + b,
2757            BinOp::Sub => a - b,
2758            BinOp::Mul => a * b,
2759            BinOp::Div => {
2760                if b == 0.0 {
2761                    return Err(RedDBError::Query(format!(
2762                        "division by zero in compound assignment for field '{column}'"
2763                    )));
2764                }
2765                a / b
2766            }
2767            BinOp::Mod => {
2768                if b == 0.0 {
2769                    return Err(RedDBError::Query(format!(
2770                        "modulo by zero in compound assignment for field '{column}'"
2771                    )));
2772                }
2773                a % b
2774            }
2775            _ => {
2776                return Err(RedDBError::Query(format!(
2777                    "unsupported compound assignment operator for field '{column}'"
2778                )));
2779            }
2780        };
2781        if !out.is_finite() {
2782            return Err(RedDBError::Query(format!(
2783                "numeric overflow in compound assignment for field '{column}'"
2784            )));
2785        }
2786        return Ok(Value::Float(out));
2787    }
2788
2789    let a = lhs_number.as_i128();
2790    let b = rhs_number.as_i128();
2791    let out = match op {
2792        BinOp::Add => a.checked_add(b),
2793        BinOp::Sub => a.checked_sub(b),
2794        BinOp::Mul => a.checked_mul(b),
2795        BinOp::Mod => {
2796            if b == 0 {
2797                return Err(RedDBError::Query(format!(
2798                    "modulo by zero in compound assignment for field '{column}'"
2799                )));
2800            }
2801            a.checked_rem(b)
2802        }
2803        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2804        _ => None,
2805    }
2806    .ok_or_else(|| {
2807        RedDBError::Query(format!(
2808            "numeric overflow in compound assignment for field '{column}'"
2809        ))
2810    })?;
2811
2812    if matches!(lhs, Value::UnsignedInteger(_)) {
2813        let value = u64::try_from(out).map_err(|_| {
2814            RedDBError::Query(format!(
2815                "numeric overflow in compound assignment for field '{column}'"
2816            ))
2817        })?;
2818        Ok(Value::UnsignedInteger(value))
2819    } else {
2820        let value = i64::try_from(out).map_err(|_| {
2821            RedDBError::Query(format!(
2822                "numeric overflow in compound assignment for field '{column}'"
2823            ))
2824        })?;
2825        Ok(Value::Integer(value))
2826    }
2827}
2828
2829#[derive(Clone, Copy)]
2830enum CompoundNumber {
2831    Integer(i128),
2832    Float(f64),
2833}
2834
2835impl CompoundNumber {
2836    fn from_value(value: &Value) -> Option<Self> {
2837        match value {
2838            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2839            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2840            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2841            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2842            _ => None,
2843        }
2844    }
2845
2846    fn is_float(self) -> bool {
2847        matches!(self, Self::Float(_))
2848    }
2849
2850    fn as_f64(self) -> f64 {
2851        match self {
2852            Self::Integer(value) => value as f64,
2853            Self::Float(value) => value,
2854        }
2855    }
2856
2857    fn as_i128(self) -> i128 {
2858        match self {
2859            Self::Integer(value) => value,
2860            Self::Float(_) => unreachable!("float compound number used as integer"),
2861        }
2862    }
2863}
2864
2865fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2866    match expr {
2867        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2868        Expr::Column { field, .. } => {
2869            field_ref_matches_update_column(field, table_name, target_column)
2870        }
2871        Expr::BinaryOp { lhs, rhs, .. } => {
2872            expr_references_update_column(lhs, table_name, target_column)
2873                || expr_references_update_column(rhs, table_name, target_column)
2874        }
2875        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2876            expr_references_update_column(operand, table_name, target_column)
2877        }
2878        Expr::FunctionCall { args, .. } => args
2879            .iter()
2880            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2881        Expr::Case {
2882            branches, else_, ..
2883        } => {
2884            branches.iter().any(|(cond, value)| {
2885                expr_references_update_column(cond, table_name, target_column)
2886                    || expr_references_update_column(value, table_name, target_column)
2887            }) || else_
2888                .as_deref()
2889                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2890        }
2891        Expr::IsNull { operand, .. } => {
2892            expr_references_update_column(operand, table_name, target_column)
2893        }
2894        Expr::InList { target, values, .. } => {
2895            expr_references_update_column(target, table_name, target_column)
2896                || values
2897                    .iter()
2898                    .any(|value| expr_references_update_column(value, table_name, target_column))
2899        }
2900        Expr::Between {
2901            target, low, high, ..
2902        } => {
2903            expr_references_update_column(target, table_name, target_column)
2904                || expr_references_update_column(low, table_name, target_column)
2905                || expr_references_update_column(high, table_name, target_column)
2906        }
2907        Expr::WindowFunctionCall { args, window, .. } => {
2908            args.iter()
2909                .any(|arg| expr_references_update_column(arg, table_name, target_column))
2910                || window
2911                    .partition_by
2912                    .iter()
2913                    .any(|e| expr_references_update_column(e, table_name, target_column))
2914                || window
2915                    .order_by
2916                    .iter()
2917                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
2918        }
2919    }
2920}
2921
2922fn field_ref_matches_update_column(
2923    field: &FieldRef,
2924    table_name: &str,
2925    target_column: &str,
2926) -> bool {
2927    match field {
2928        FieldRef::TableColumn { table, column } => {
2929            column.eq_ignore_ascii_case(target_column)
2930                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2931        }
2932        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2933            false
2934        }
2935    }
2936}
2937
2938fn resolve_update_entity_by_logical_id(
2939    runtime: &RedDBRuntime,
2940    table: &str,
2941    logical_id: EntityId,
2942) -> Option<UnifiedEntity> {
2943    let store = runtime.inner.db.store();
2944    if let Some(entity) =
2945        crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2946            .resolve_logical_id(&store, table, logical_id)
2947    {
2948        return Some(entity);
2949    }
2950    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
2951    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
2952    store.get(table, logical_id)
2953}
2954
2955fn update_cdc_item_kind(
2956    runtime: &RedDBRuntime,
2957    collection: &str,
2958    entity: &UnifiedEntity,
2959) -> &'static str {
2960    match &entity.data {
2961        EntityData::Node(_) => return "node",
2962        EntityData::Edge(_) => return "edge",
2963        _ => {}
2964    }
2965
2966    match runtime
2967        .db()
2968        .collection_contract(collection)
2969        .map(|contract| contract.declared_model)
2970    {
2971        Some(crate::catalog::CollectionModel::Document) => "document",
2972        Some(crate::catalog::CollectionModel::Kv)
2973        | Some(crate::catalog::CollectionModel::Vault) => "kv",
2974        _ => "row",
2975    }
2976}
2977
2978fn ordered_update_target_ids(
2979    manager: &Arc<crate::storage::SegmentManager>,
2980    entity_ids: &[EntityId],
2981    order_by: &[OrderByClause],
2982    limit: Option<usize>,
2983) -> Vec<EntityId> {
2984    let mut entities: Vec<UnifiedEntity> =
2985        manager.get_many(entity_ids).into_iter().flatten().collect();
2986    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
2987    if let Some(limit) = limit {
2988        entities.truncate(limit);
2989    }
2990    entities.into_iter().map(|entity| entity.id).collect()
2991}
2992
2993fn compare_update_order(
2994    left: &UnifiedEntity,
2995    right: &UnifiedEntity,
2996    order_by: &[OrderByClause],
2997) -> Ordering {
2998    for clause in order_by {
2999        let left_value = update_order_value(left, &clause.field);
3000        let right_value = update_order_value(right, &clause.field);
3001        let ordering = compare_update_order_values(
3002            left_value.as_ref(),
3003            right_value.as_ref(),
3004            clause.nulls_first,
3005        );
3006        if ordering != Ordering::Equal {
3007            return if clause.ascending {
3008                ordering
3009            } else {
3010                ordering.reverse()
3011            };
3012        }
3013    }
3014    left.logical_id().raw().cmp(&right.logical_id().raw())
3015}
3016
3017fn compare_update_order_values(
3018    left: Option<&Value>,
3019    right: Option<&Value>,
3020    nulls_first: bool,
3021) -> Ordering {
3022    match (left, right) {
3023        (None, None) => Ordering::Equal,
3024        (None, Some(_)) => {
3025            if nulls_first {
3026                Ordering::Less
3027            } else {
3028                Ordering::Greater
3029            }
3030        }
3031        (Some(_), None) => {
3032            if nulls_first {
3033                Ordering::Greater
3034            } else {
3035                Ordering::Less
3036            }
3037        }
3038        (Some(left), Some(right)) => {
3039            crate::storage::query::value_compare::total_compare_values(left, right)
3040        }
3041    }
3042}
3043
3044fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3045    let FieldRef::TableColumn { table, column } = field else {
3046        return None;
3047    };
3048    if !table.is_empty() {
3049        return None;
3050    }
3051    if column.eq_ignore_ascii_case("rid") {
3052        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3053    }
3054    match &entity.data {
3055        EntityData::Row(row) => row.get_field(column).cloned(),
3056        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3057            .and_then(|record| record.get(column).cloned()),
3058        _ => None,
3059    }
3060}
3061
3062fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3063    if columns.is_empty() {
3064        return columns;
3065    }
3066
3067    let mut unique = Vec::with_capacity(columns.len());
3068    for column in columns.drain(..) {
3069        if !unique
3070            .iter()
3071            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3072        {
3073            unique.push(column);
3074        }
3075    }
3076    unique
3077}
3078
3079// =============================================================================
3080// Helper functions for extracting typed values from column/value pairs
3081// =============================================================================
3082
3083const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3084
3085fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3086    if column.eq_ignore_ascii_case("_ttl") {
3087        Some(SQL_TTL_METADATA_COLUMNS[0])
3088    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3089        Some(SQL_TTL_METADATA_COLUMNS[1])
3090    } else if column.eq_ignore_ascii_case("_expires_at") {
3091        Some(SQL_TTL_METADATA_COLUMNS[2])
3092    } else {
3093        None
3094    }
3095}
3096
3097/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3098/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3099/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3100/// `_ttl_ms` and `_expires_at` are passed through.
3101fn canonicalize_sql_ttl_metadata(
3102    key: &'static str,
3103    value: MetadataValue,
3104) -> (&'static str, MetadataValue) {
3105    if key != "_ttl" {
3106        return (key, value);
3107    }
3108    let scaled = match value {
3109        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3110        MetadataValue::Timestamp(ms_or_s) => {
3111            // Timestamp is already chosen for very large values; treat as
3112            // already-ms to avoid silent overflow.
3113            MetadataValue::Timestamp(ms_or_s)
3114        }
3115        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3116        other => other,
3117    };
3118    ("_ttl_ms", scaled)
3119}
3120
3121/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3122/// `SECRET('...')` literals. The runtime strips this marker and
3123/// applies the actual crypto transform during INSERT execution.
3124pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3125
3126impl RedDBRuntime {
3127    /// Strip the plaintext sentinel from a `Value::Password` or
3128    /// `Value::Secret` produced by the parser and apply the real
3129    /// crypto transform. `Password` is always hashed with argon2id.
3130    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3131    /// when `red.config.secret.auto_encrypt = true` (default).
3132    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3133        match value {
3134            Value::Password(marked) => {
3135                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3136                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3137                } else {
3138                    Ok(Value::Password(marked))
3139                }
3140            }
3141            Value::Secret(bytes) => {
3142                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3143                    if !self.secret_auto_encrypt() {
3144                        return Err(RedDBError::Query(
3145                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3146                             is false. Insert pre-encrypted bytes directly instead."
3147                                .to_string(),
3148                        ));
3149                    }
3150                    let key = self.secret_aes_key().ok_or_else(|| {
3151                        RedDBError::Query(
3152                            "SECRET() column encryption requires a bootstrapped \
3153                             vault (red.secret.aes_key is missing). Start the server \
3154                             with --vault to enable."
3155                                .to_string(),
3156                        )
3157                    })?;
3158                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3159                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3160                } else {
3161                    Ok(Value::Secret(bytes))
3162                }
3163            }
3164            other => Ok(other),
3165        }
3166    }
3167}
3168
3169/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3170/// This is the on-disk representation of `Value::Secret`.
3171fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3172    let nonce_bytes = crate::auth::store::random_bytes(12);
3173    let mut nonce = [0u8; 12];
3174    nonce.copy_from_slice(&nonce_bytes[..12]);
3175    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3176    let mut out = Vec::with_capacity(12 + ct.len());
3177    out.extend_from_slice(&nonce);
3178    out.extend_from_slice(&ct);
3179    out
3180}
3181
3182/// Decode a `Value::Secret` payload back to plaintext. Returns
3183/// `None` when the payload is too short or AES-GCM authentication
3184/// fails (tampered or wrong key).
3185pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3186    if payload.len() < 12 {
3187        return None;
3188    }
3189    let mut nonce = [0u8; 12];
3190    nonce.copy_from_slice(&payload[..12]);
3191    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3192}
3193
3194fn split_insert_metadata(
3195    runtime: &RedDBRuntime,
3196    columns: &[String],
3197    values: &[Value],
3198) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3199    let mut fields = Vec::new();
3200    let mut metadata = Vec::new();
3201
3202    for (column, value) in columns.iter().zip(values.iter()) {
3203        // Still support legacy _ttl columns for backward compat
3204        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3205            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3206            let (canonical_key, canonical_value) =
3207                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3208            metadata.push((canonical_key.to_string(), canonical_value));
3209            continue;
3210        }
3211        fields.push((
3212            column.clone(),
3213            runtime.resolve_crypto_sentinel(value.clone())?,
3214        ));
3215    }
3216
3217    Ok((fields, metadata))
3218}
3219
3220/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3221fn merge_with_clauses(
3222    metadata: &mut Vec<(String, MetadataValue)>,
3223    ttl_ms: Option<u64>,
3224    expires_at_ms: Option<u64>,
3225    with_metadata: &[(String, Value)],
3226) {
3227    if let Some(ms) = ttl_ms {
3228        metadata.push((
3229            "_ttl_ms".to_string(),
3230            if ms <= i64::MAX as u64 {
3231                MetadataValue::Int(ms as i64)
3232            } else {
3233                MetadataValue::Timestamp(ms)
3234            },
3235        ));
3236    }
3237    if let Some(ms) = expires_at_ms {
3238        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3239    }
3240    for (key, value) in with_metadata {
3241        let meta_value = match value {
3242            Value::Text(s) => MetadataValue::String(s.to_string()),
3243            Value::Integer(n) => MetadataValue::Int(*n),
3244            Value::Float(n) => MetadataValue::Float(*n),
3245            Value::Boolean(b) => MetadataValue::Bool(*b),
3246            _ => MetadataValue::String(value.to_string()),
3247        };
3248        metadata.push((key.clone(), meta_value));
3249    }
3250}
3251
3252fn merge_vector_metadata_column(
3253    metadata: &mut Vec<(String, MetadataValue)>,
3254    columns: &[String],
3255    values: &[Value],
3256) -> RedDBResult<()> {
3257    let Some(value) = columns
3258        .iter()
3259        .position(|column| column.eq_ignore_ascii_case("metadata"))
3260        .map(|index| &values[index])
3261    else {
3262        return Ok(());
3263    };
3264    let json = match value {
3265        Value::Null => return Ok(()),
3266        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3267            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3268        })?,
3269        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3270            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3271        })?,
3272        other => {
3273            return Err(RedDBError::Query(format!(
3274                "column 'metadata' expected JSON object, got {other:?}"
3275            )))
3276        }
3277    };
3278    let parsed = metadata_from_json(&json)?;
3279    for (key, value) in parsed.iter() {
3280        metadata.push((key.clone(), value.clone()));
3281    }
3282    Ok(())
3283}
3284
3285fn apply_collection_default_ttl_metadata(
3286    runtime: &RedDBRuntime,
3287    collection: &str,
3288    metadata: &mut Vec<(String, MetadataValue)>,
3289) {
3290    if has_internal_ttl_metadata(metadata) {
3291        return;
3292    }
3293
3294    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3295        return;
3296    };
3297
3298    metadata.push((
3299        "_ttl_ms".to_string(),
3300        if default_ttl_ms <= i64::MAX as u64 {
3301            MetadataValue::Int(default_ttl_ms as i64)
3302        } else {
3303            MetadataValue::Timestamp(default_ttl_ms)
3304        },
3305    ));
3306}
3307
3308fn ensure_non_tree_reserved_metadata_entries(
3309    metadata: &[(String, MetadataValue)],
3310) -> RedDBResult<()> {
3311    for (key, _) in metadata {
3312        ensure_non_tree_reserved_metadata_key(key)?;
3313    }
3314    Ok(())
3315}
3316
3317fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3318    if key.starts_with(TREE_METADATA_PREFIX) {
3319        return Err(RedDBError::Query(format!(
3320            "metadata key '{}' is reserved for managed trees",
3321            key
3322        )));
3323    }
3324    Ok(())
3325}
3326
3327fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3328    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3329        return Err(RedDBError::Query(format!(
3330            "edge label '{}' is reserved for managed trees",
3331            TREE_CHILD_EDGE_LABEL
3332        )));
3333    }
3334    Ok(())
3335}
3336
3337fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3338    let mut columns = Vec::with_capacity(pairs.len());
3339    let mut values = Vec::with_capacity(pairs.len());
3340
3341    for (column, value) in pairs {
3342        columns.push(column.clone());
3343        values.push(value.clone());
3344    }
3345
3346    (columns, values)
3347}
3348
3349/// Find a required column value and return it as-is.
3350fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3351    for (i, col) in columns.iter().enumerate() {
3352        if col.eq_ignore_ascii_case(name) {
3353            return Ok(values[i].clone());
3354        }
3355    }
3356    Err(RedDBError::Query(format!(
3357        "required column '{name}' not found in INSERT"
3358    )))
3359}
3360
3361/// Find a required column value and coerce to String.
3362fn find_column_value_string(
3363    columns: &[String],
3364    values: &[Value],
3365    name: &str,
3366) -> RedDBResult<String> {
3367    let val = find_column_value(columns, values, name)?;
3368    match val {
3369        Value::Text(s) => Ok(s.to_string()),
3370        Value::Integer(n) => Ok(n.to_string()),
3371        Value::Float(n) => Ok(n.to_string()),
3372        other => Err(RedDBError::Query(format!(
3373            "column '{name}' expected text, got {other:?}"
3374        ))),
3375    }
3376}
3377
3378fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3379    let val = find_column_value(columns, values, name)?;
3380    match val {
3381        Value::Float(n) => Ok(n),
3382        Value::Integer(n) => Ok(n as f64),
3383        Value::UnsignedInteger(n) => Ok(n as f64),
3384        Value::Text(s) => s
3385            .parse::<f64>()
3386            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3387        other => Err(RedDBError::Query(format!(
3388            "column '{name}' expected number, got {other:?}"
3389        ))),
3390    }
3391}
3392
3393/// Find an optional column value as String.
3394fn find_column_value_opt_string(
3395    columns: &[String],
3396    values: &[Value],
3397    name: &str,
3398) -> Option<String> {
3399    for (i, col) in columns.iter().enumerate() {
3400        if col.eq_ignore_ascii_case(name) {
3401            return match &values[i] {
3402                Value::Null => None,
3403                Value::Text(s) => Some(s.to_string()),
3404                Value::Integer(n) => Some(n.to_string()),
3405                Value::Float(n) => Some(n.to_string()),
3406                _ => None,
3407            };
3408        }
3409    }
3410    None
3411}
3412
3413/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3414///
3415/// Accepts integer literals, decimal strings, and node labels resolved via
3416/// the per-collection graph label index (same source of truth that
3417/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3418/// labels error so callers can fall back to the numeric id form.
3419fn resolve_edge_endpoint(
3420    store: &crate::storage::unified::UnifiedStore,
3421    collection: &str,
3422    columns: &[String],
3423    values: &[Value],
3424    name: &str,
3425) -> RedDBResult<u64> {
3426    let val = find_column_value(columns, values, name)?;
3427    match val {
3428        Value::Integer(n) => Ok(n as u64),
3429        Value::UnsignedInteger(n) => Ok(n),
3430        Value::Text(s) => {
3431            if let Ok(n) = s.parse::<u64>() {
3432                return Ok(n);
3433            }
3434            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3435            match matches.len() {
3436                0 => Err(RedDBError::Query(format!(
3437                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3438                ))),
3439                1 => Ok(matches[0].raw()),
3440                n => Err(RedDBError::Query(format!(
3441                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3442                ))),
3443            }
3444        }
3445        other => Err(RedDBError::Query(format!(
3446            "column '{name}' expected integer or node label, got {other:?}"
3447        ))),
3448    }
3449}
3450
3451fn resolve_edge_endpoint_any(
3452    store: &crate::storage::unified::UnifiedStore,
3453    collection: &str,
3454    columns: &[String],
3455    values: &[Value],
3456    names: &[&str],
3457) -> RedDBResult<u64> {
3458    for name in names {
3459        if columns
3460            .iter()
3461            .any(|column| column.eq_ignore_ascii_case(name))
3462        {
3463            return resolve_edge_endpoint(store, collection, columns, values, name);
3464        }
3465    }
3466
3467    Err(RedDBError::Query(format!(
3468        "required column '{}' not found in INSERT",
3469        names.first().copied().unwrap_or("from_rid")
3470    )))
3471}
3472
3473/// Find a required column value and coerce to u64.
3474fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3475    let val = find_column_value(columns, values, name)?;
3476    match val {
3477        Value::Integer(n) => Ok(n as u64),
3478        Value::UnsignedInteger(n) => Ok(n),
3479        Value::Text(s) => s
3480            .parse::<u64>()
3481            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3482        other => Err(RedDBError::Query(format!(
3483            "column '{name}' expected integer, got {other:?}"
3484        ))),
3485    }
3486}
3487
3488/// Find an optional column value as f32.
3489fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3490    for (i, col) in columns.iter().enumerate() {
3491        if col.eq_ignore_ascii_case(name) {
3492            return match &values[i] {
3493                Value::Float(n) => Some(*n as f32),
3494                Value::Integer(n) => Some(*n as f32),
3495                Value::Null => None,
3496                _ => None,
3497            };
3498        }
3499    }
3500    None
3501}
3502
3503/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3504fn find_column_value_vec_f32(
3505    columns: &[String],
3506    values: &[Value],
3507    name: &str,
3508) -> RedDBResult<Vec<f32>> {
3509    let val = find_column_value(columns, values, name)?;
3510    match val {
3511        Value::Vector(v) => Ok(v),
3512        Value::Json(bytes) => {
3513            // Try to parse as JSON array of numbers
3514            let s = std::str::from_utf8(&bytes).map_err(|_| {
3515                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3516            })?;
3517            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3518                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3519            })?;
3520            Ok(arr)
3521        }
3522        other => Err(RedDBError::Query(format!(
3523            "column '{name}' expected vector, got {other:?}"
3524        ))),
3525    }
3526}
3527
3528fn find_column_value_vec_f32_any(
3529    columns: &[String],
3530    values: &[Value],
3531    names: &[&str],
3532) -> RedDBResult<Vec<f32>> {
3533    for name in names {
3534        if columns
3535            .iter()
3536            .any(|column| column.eq_ignore_ascii_case(name))
3537        {
3538            return find_column_value_vec_f32(columns, values, name);
3539        }
3540    }
3541    Err(RedDBError::Query(format!(
3542        "required vector column '{}' not found in INSERT",
3543        names.join("' or '")
3544    )))
3545}
3546
3547/// Extract remaining properties (all columns not in the exclusion list).
3548fn extract_remaining_properties(
3549    columns: &[String],
3550    values: &[Value],
3551    exclude: &[&str],
3552) -> Vec<(String, Value)> {
3553    columns
3554        .iter()
3555        .zip(values.iter())
3556        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3557        .map(|(col, val)| (col.clone(), val.clone()))
3558        .collect()
3559}
3560
3561fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3562    let mut invalid = Vec::new();
3563    for column in columns {
3564        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3565            invalid.push(column.clone());
3566        }
3567    }
3568
3569    if invalid.is_empty() {
3570        Ok(())
3571    } else {
3572        Err(RedDBError::Query(format!(
3573            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3574            invalid.join(", ")
3575        )))
3576    }
3577}
3578
3579fn is_timeseries_insert_column(column: &str) -> bool {
3580    matches!(
3581        column.to_ascii_lowercase().as_str(),
3582        "metric"
3583            | "value"
3584            | "tags"
3585            | "timestamp"
3586            | "timestamp_ns"
3587            | "time"
3588            // Analytics-event extension (#577): an analytics row carries
3589            // an `event_name` + JSON `payload`. The payload is validated
3590            // against the AnalyticsSchemaRegistry inside
3591            // `insert_timeseries_point` before the row lands.
3592            | "event_name"
3593            | "payload"
3594    )
3595}
3596
3597fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3598    let mut found = None;
3599
3600    for alias in ["timestamp_ns", "timestamp", "time"] {
3601        for (index, column) in columns.iter().enumerate() {
3602            if !column.eq_ignore_ascii_case(alias) {
3603                continue;
3604            }
3605
3606            if found.is_some() {
3607                return Err(RedDBError::Query(
3608                    "timeseries INSERT accepts only one timestamp column".to_string(),
3609                ));
3610            }
3611
3612            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3613        }
3614    }
3615
3616    Ok(found)
3617}
3618
3619fn find_timeseries_tags(
3620    columns: &[String],
3621    values: &[Value],
3622) -> RedDBResult<std::collections::HashMap<String, String>> {
3623    for (index, column) in columns.iter().enumerate() {
3624        if column.eq_ignore_ascii_case("tags") {
3625            return parse_timeseries_tags(&values[index]);
3626        }
3627    }
3628    Ok(std::collections::HashMap::new())
3629}
3630
3631fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3632    match value {
3633        Value::Null => Ok(std::collections::HashMap::new()),
3634        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3635        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3636        other => Err(RedDBError::Query(format!(
3637            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3638        ))),
3639    }
3640}
3641
3642fn parse_timeseries_tags_json(
3643    bytes: &[u8],
3644) -> RedDBResult<std::collections::HashMap<String, String>> {
3645    let json: crate::json::Value = crate::json::from_slice(bytes)
3646        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3647
3648    let object = match json {
3649        crate::json::Value::Object(object) => object,
3650        other => {
3651            return Err(RedDBError::Query(format!(
3652                "timeseries tags must be a JSON object, got {other:?}"
3653            )))
3654        }
3655    };
3656
3657    let mut tags = std::collections::HashMap::with_capacity(object.len());
3658    for (key, value) in object {
3659        tags.insert(key, json_tag_value_to_string(&value));
3660    }
3661    Ok(tags)
3662}
3663
3664/// Encode a tag value for storage so the original JSON type can be
3665/// recovered on read (issue #543).
3666///
3667/// Time-series tags are stored as `HashMap<String, String>` on the
3668/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3669/// the segment codec, WAL and gRPC mirrors don't need a new value
3670/// variant. To preserve the original JSON type across that
3671/// string-only channel we prepend the
3672/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3673/// and serialize the value as compact JSON text. The read paths
3674/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3675/// the marker, parse the suffix, and recover a real JSON value.
3676/// Tags written through other channels (Prometheus remote write,
3677/// metrics handlers, legacy on-disk data) lack the marker and are
3678/// returned as `JsonValue::String(raw)` exactly as before.
3679fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3680    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3681    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3682    buf.push_str(&value.to_string_compact());
3683    buf
3684}
3685
3686fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3687    match value {
3688        Value::UnsignedInteger(value) => Ok(*value),
3689        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3690        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3691        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3692            RedDBError::Query(format!(
3693                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3694            ))
3695        }),
3696        other => Err(RedDBError::Query(format!(
3697            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3698        ))),
3699    }
3700}
3701
3702fn current_unix_ns() -> u64 {
3703    std::time::SystemTime::now()
3704        .duration_since(std::time::UNIX_EPOCH)
3705        .unwrap_or_default()
3706        .as_nanos()
3707        .min(u128::from(u64::MAX)) as u64
3708}
3709
3710fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3711    use crate::json::{Map, Value as JV};
3712    match value {
3713        MetadataValue::Null => JV::Null,
3714        MetadataValue::Bool(value) => JV::Bool(*value),
3715        MetadataValue::Int(value) => JV::Number(*value as f64),
3716        MetadataValue::Float(value) => JV::Number(*value),
3717        MetadataValue::String(value) => JV::String(value.clone()),
3718        MetadataValue::Bytes(value) => JV::Array(
3719            value
3720                .iter()
3721                .map(|value| JV::Number(*value as f64))
3722                .collect(),
3723        ),
3724        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3725        MetadataValue::Array(values) => {
3726            JV::Array(values.iter().map(metadata_value_to_json).collect())
3727        }
3728        MetadataValue::Object(object) => {
3729            let entries = object
3730                .iter()
3731                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3732                .collect();
3733            JV::Object(entries)
3734        }
3735        MetadataValue::Geo { lat, lon } => {
3736            let mut object = Map::new();
3737            object.insert("lat".to_string(), JV::Number(*lat));
3738            object.insert("lon".to_string(), JV::Number(*lon));
3739            JV::Object(object)
3740        }
3741        MetadataValue::Reference(target) => {
3742            let mut object = Map::new();
3743            object.insert(
3744                "collection".to_string(),
3745                JV::String(target.collection().to_string()),
3746            );
3747            object.insert(
3748                "entity_id".to_string(),
3749                JV::Number(target.entity_id().raw() as f64),
3750            );
3751            JV::Object(object)
3752        }
3753        MetadataValue::References(values) => {
3754            let refs = values
3755                .iter()
3756                .map(|target| {
3757                    let mut object = Map::new();
3758                    object.insert(
3759                        "collection".to_string(),
3760                        JV::String(target.collection().to_string()),
3761                    );
3762                    object.insert(
3763                        "entity_id".to_string(),
3764                        JV::Number(target.entity_id().raw() as f64),
3765                    );
3766                    JV::Object(object)
3767                })
3768                .collect();
3769            JV::Array(refs)
3770        }
3771    }
3772}
3773
3774fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3775    match value {
3776        Value::Null => MetadataValue::Null,
3777        Value::Boolean(value) => MetadataValue::Bool(*value),
3778        Value::Integer(value) => MetadataValue::Int(*value),
3779        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3780        Value::Float(value) => MetadataValue::Float(*value),
3781        Value::Text(value) => MetadataValue::String(value.to_string()),
3782        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3783        Value::Timestamp(value) => {
3784            if *value >= 0 {
3785                metadata_u64_to_value(*value as u64)
3786            } else {
3787                MetadataValue::Int(*value)
3788            }
3789        }
3790        Value::TimestampMs(value) => {
3791            if *value >= 0 {
3792                metadata_u64_to_value(*value as u64)
3793            } else {
3794                MetadataValue::Int(*value)
3795            }
3796        }
3797        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3798        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3799        Value::Date(value) => MetadataValue::String(value.to_string()),
3800        Value::Time(value) => MetadataValue::String(value.to_string()),
3801        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3802        Value::Ipv4(value) => MetadataValue::String(format!(
3803            "{}.{}.{}.{}",
3804            (value >> 24) & 0xFF,
3805            (value >> 16) & 0xFF,
3806            (value >> 8) & 0xFF,
3807            value & 0xFF
3808        )),
3809        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3810        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3811        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3812        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3813            lat: *lat as f64 / 1_000_000.0,
3814            lon: *lon as f64 / 1_000_000.0,
3815        },
3816        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3817        Value::TableRef(value) => MetadataValue::String(value.clone()),
3818        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3819        Value::Password(value) => MetadataValue::String(value.clone()),
3820        Value::Array(values) => {
3821            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3822        }
3823        _ => MetadataValue::String(value.to_string()),
3824    }
3825}
3826
3827fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3828    match value {
3829        Value::Null => Ok(MetadataValue::Null),
3830        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3831        Value::Integer(_) => Err(RedDBError::Query(format!(
3832            "column '{field}' must be non-negative for TTL metadata"
3833        ))),
3834        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3835        Value::Float(value) if value.is_finite() => {
3836            if value.fract().abs() >= f64::EPSILON {
3837                return Err(RedDBError::Query(format!(
3838                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3839                )));
3840            }
3841            if *value < 0.0 {
3842                return Err(RedDBError::Query(format!(
3843                    "column '{field}' must be non-negative for TTL metadata"
3844                )));
3845            }
3846            if *value > u64::MAX as f64 {
3847                return Err(RedDBError::Query(format!(
3848                    "column '{field}' value is too large"
3849                )));
3850            }
3851            Ok(metadata_u64_to_value(*value as u64))
3852        }
3853        Value::Float(_) => Err(RedDBError::Query(format!(
3854            "column '{field}' must be a finite number"
3855        ))),
3856        Value::Text(value) => {
3857            let value = value.trim();
3858            if let Ok(value) = value.parse::<u64>() {
3859                Ok(metadata_u64_to_value(value))
3860            } else if let Ok(value) = value.parse::<i64>() {
3861                if value < 0 {
3862                    return Err(RedDBError::Query(format!(
3863                        "column '{field}' must be non-negative for TTL metadata"
3864                    )));
3865                }
3866                Ok(metadata_u64_to_value(value as u64))
3867            } else if let Ok(value) = value.parse::<f64>() {
3868                if !value.is_finite() {
3869                    return Err(RedDBError::Query(format!(
3870                        "column '{field}' must be a finite number"
3871                    )));
3872                }
3873                if value.fract().abs() >= f64::EPSILON {
3874                    return Err(RedDBError::Query(format!(
3875                        "column '{field}' must be an integer (TTL metadata must be an integer)"
3876                    )));
3877                }
3878                if value < 0.0 {
3879                    return Err(RedDBError::Query(format!(
3880                        "column '{field}' must be non-negative for TTL metadata"
3881                    )));
3882                }
3883                if value > u64::MAX as f64 {
3884                    return Err(RedDBError::Query(format!(
3885                        "column '{field}' value is too large"
3886                    )));
3887                }
3888                Ok(metadata_u64_to_value(value as u64))
3889            } else {
3890                Err(RedDBError::Query(format!(
3891                    "column '{field}' expects a numeric value for TTL metadata"
3892                )))
3893            }
3894        }
3895        _ => Err(RedDBError::Query(format!(
3896            "column '{field}' expects a numeric value for TTL metadata"
3897        ))),
3898    }
3899}
3900
3901fn metadata_u64_to_value(value: u64) -> MetadataValue {
3902    if value <= i64::MAX as u64 {
3903        MetadataValue::Int(value as i64)
3904    } else {
3905        MetadataValue::Timestamp(value)
3906    }
3907}
3908
3909/// Phase 2 PG parity: inspect a column value and return `true` when
3910/// the dotted `tail` path is already present under it. Used by the
3911/// tenant auto-fill so rows that already carry an explicit value
3912/// (bulk import, admin insert on behalf of a tenant) are not
3913/// double-stamped with the session's current_tenant().
3914fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3915    let json = match value {
3916        Value::Null => return false,
3917        Value::Json(bytes) | Value::Blob(bytes) => {
3918            match crate::json::from_slice::<crate::json::Value>(bytes) {
3919                Ok(v) => v,
3920                Err(_) => return false,
3921            }
3922        }
3923        Value::Text(s) => {
3924            let trimmed = s.trim_start();
3925            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3926                return false;
3927            }
3928            match crate::json::from_str::<crate::json::Value>(s) {
3929                Ok(v) => v,
3930                Err(_) => return false,
3931            }
3932        }
3933        _ => return false,
3934    };
3935    let mut cursor = &json;
3936    for seg in tail.split('.') {
3937        match cursor {
3938            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3939                Some((_, v)) => cursor = v,
3940                None => return false,
3941            },
3942            _ => return false,
3943        }
3944    }
3945    !matches!(cursor, crate::json::Value::Null)
3946}
3947
3948/// Phase 2 PG parity: take a column value (possibly Null / Text /
3949/// Json) and return a `Value::Json` with the dotted `tail` path set
3950/// to `tenant_id`. Preserves every pre-existing key.
3951///
3952/// Accepts:
3953/// * `Value::Null`  → fresh `{tail: tenant_id}` object
3954/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
3955/// * `Value::text(s)` if `s` is valid JSON → same as Json
3956/// * anything else → error (user supplied a scalar where we need
3957///   a JSON container)
3958fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3959    let mut root = match current {
3960        Value::Null => crate::json::Value::Object(Default::default()),
3961        Value::Json(bytes) | Value::Blob(bytes) => {
3962            crate::json::from_slice(&bytes).map_err(|err| {
3963                RedDBError::Query(format!(
3964                    "tenant auto-fill: root column is not valid JSON ({err})"
3965                ))
3966            })?
3967        }
3968        Value::Text(s) => {
3969            if s.trim().is_empty() {
3970                crate::json::Value::Object(Default::default())
3971            } else {
3972                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3973                    RedDBError::Query(format!(
3974                        "tenant auto-fill: text root is not valid JSON ({err})"
3975                    ))
3976                })?
3977            }
3978        }
3979        other => {
3980            return Err(RedDBError::Query(format!(
3981                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
3982            )));
3983        }
3984    };
3985
3986    // Navigate path segments, creating intermediate objects on demand.
3987    let segments: Vec<&str> = tail.split('.').collect();
3988    let mut cursor: &mut crate::json::Value = &mut root;
3989    for (i, seg) in segments.iter().enumerate() {
3990        let is_last = i + 1 == segments.len();
3991        let map = match cursor {
3992            crate::json::Value::Object(m) => m,
3993            _ => {
3994                return Err(RedDBError::Query(format!(
3995                    "tenant auto-fill: segment '{seg}' is not inside an object"
3996                )));
3997            }
3998        };
3999        if is_last {
4000            map.insert(
4001                seg.to_string(),
4002                crate::json::Value::String(tenant_id.to_string()),
4003            );
4004            break;
4005        }
4006        cursor = map
4007            .entry(seg.to_string())
4008            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4009    }
4010
4011    let bytes = crate::json::to_vec(&root).map_err(|err| {
4012        RedDBError::Query(format!(
4013            "tenant auto-fill: failed to re-serialize JSON ({err})"
4014        ))
4015    })?;
4016    Ok(Value::Json(bytes))
4017}
4018
4019#[cfg(test)]
4020mod tests {
4021    use crate::storage::schema::Value;
4022    use crate::storage::wal::{WalReader, WalRecord};
4023    use crate::{RedDBOptions, RedDBRuntime};
4024    use std::path::Path;
4025
4026    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4027        WalReader::open(wal_path)
4028            .expect("wal opens")
4029            .iter()
4030            .map(|record| record.expect("wal record decodes").1)
4031            .filter_map(|record| match record {
4032                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4033                _ => None,
4034            })
4035            .collect()
4036    }
4037
4038    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4039        action
4040            .windows(needle.len())
4041            .any(|window| window == needle.as_bytes())
4042    }
4043
4044    fn assert_statement_writes_collections_in_one_new_wal_batch(
4045        rt: &RedDBRuntime,
4046        wal_path: &Path,
4047        statement: &str,
4048        source: &str,
4049        event_queue: &str,
4050    ) {
4051        let before_batches = store_commit_batches(wal_path).len();
4052
4053        rt.execute_query(statement).unwrap();
4054
4055        let batches = store_commit_batches(wal_path);
4056        let statement_batches = &batches[before_batches..];
4057        let source_batch = statement_batches
4058            .iter()
4059            .position(|actions| {
4060                actions.iter().any(|action| {
4061                    action_contains_text(action, source)
4062                        && !action_contains_text(action, event_queue)
4063                })
4064            })
4065            .expect("source collection write batch is present");
4066        let event_batch = statement_batches
4067            .iter()
4068            .position(|actions| {
4069                actions
4070                    .iter()
4071                    .any(|action| action_contains_text(action, event_queue))
4072            })
4073            .expect("event queue write batch is present");
4074
4075        assert_eq!(
4076            source_batch, event_batch,
4077            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4078        );
4079    }
4080
4081    #[test]
4082    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4083        let dir = tempfile::tempdir().unwrap();
4084        let db_path = dir.path().join("events_dual_write.rdb");
4085        let wal_path = db_path.with_extension("rdb-uwal");
4086        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4087
4088        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4089            .unwrap();
4090        assert_statement_writes_collections_in_one_new_wal_batch(
4091            &rt,
4092            &wal_path,
4093            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4094            "users",
4095            "users_events",
4096        );
4097    }
4098
4099    #[test]
4100    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4101        let dir = tempfile::tempdir().unwrap();
4102        let db_path = dir.path().join("events_update_atomic.rdb");
4103        let wal_path = db_path.with_extension("rdb-uwal");
4104        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4105
4106        rt.execute_query(
4107            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4108        )
4109        .unwrap();
4110        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4111            .unwrap();
4112
4113        assert_statement_writes_collections_in_one_new_wal_batch(
4114            &rt,
4115            &wal_path,
4116            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4117            "users",
4118            "user_updates",
4119        );
4120    }
4121
4122    #[test]
4123    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4124        let dir = tempfile::tempdir().unwrap();
4125        let db_path = dir.path().join("events_delete_atomic.rdb");
4126        let wal_path = db_path.with_extension("rdb-uwal");
4127        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4128
4129        rt.execute_query(
4130            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4131        )
4132        .unwrap();
4133        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4134            .unwrap();
4135
4136        assert_statement_writes_collections_in_one_new_wal_batch(
4137            &rt,
4138            &wal_path,
4139            "DELETE FROM users WHERE id = 1",
4140            "users",
4141            "user_deletes",
4142        );
4143    }
4144
4145    #[test]
4146    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4147        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4148        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4149            .unwrap();
4150        for id in 0..5 {
4151            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4152                .unwrap();
4153        }
4154        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4155            .unwrap();
4156
4157        let updated = rt
4158            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4159            .unwrap();
4160        assert_eq!(updated.affected_rows, 3);
4161
4162        let selected = rt
4163            .execute_query("SELECT id, score FROM users ORDER BY id")
4164            .unwrap();
4165        let scores: Vec<(i64, i64)> = selected
4166            .result
4167            .records
4168            .iter()
4169            .map(|record| {
4170                let id = match record.get("id").unwrap() {
4171                    Value::Integer(value) => *value,
4172                    other => panic!("expected integer id, got {other:?}"),
4173                };
4174                let score = match record.get("score").unwrap() {
4175                    Value::Integer(value) => *value,
4176                    other => panic!("expected integer score, got {other:?}"),
4177                };
4178                (id, score)
4179            })
4180            .collect();
4181        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4182    }
4183
4184    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4185    /// same code path DELETE uses (#51, #52). Exercises the indexed
4186    /// equality fast-path (WHERE id = N with a HASH index), the
4187    /// unindexed range scan (WHERE score > N), and the no-WHERE
4188    /// full-scan branch to confirm the extracted "find target rows"
4189    /// loop preserves affected-row counts and the resulting row state.
4190    #[test]
4191    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4192        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4193        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4194            .unwrap();
4195        for id in 0..5 {
4196            rt.execute_query(&format!(
4197                "INSERT INTO items (id, score) VALUES ({id}, {})",
4198                id * 10
4199            ))
4200            .unwrap();
4201        }
4202        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4203            .unwrap();
4204
4205        // Indexed equality UPDATE — hits the hash fast-path inside
4206        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4207        // below the score>25 cutoff so the next assertion stays clean.
4208        let updated_one = rt
4209            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4210            .unwrap();
4211        assert_eq!(updated_one.affected_rows, 1);
4212
4213        // Unindexed scan UPDATE — bumps everyone with score > 25,
4214        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4215        // zoned/full-scan branch.
4216        let updated_many = rt
4217            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4218            .unwrap();
4219        assert_eq!(updated_many.affected_rows, 2);
4220
4221        let snapshot = rt
4222            .execute_query("SELECT id, score FROM items ORDER BY id")
4223            .unwrap();
4224        let pairs: Vec<(i64, i64)> = snapshot
4225            .result
4226            .records
4227            .iter()
4228            .map(|record| {
4229                let id = match record.get("id").unwrap() {
4230                    Value::Integer(value) => *value,
4231                    other => panic!("expected integer id, got {other:?}"),
4232                };
4233                let score = match record.get("score").unwrap() {
4234                    Value::Integer(value) => *value,
4235                    other => panic!("expected integer score, got {other:?}"),
4236                };
4237                (id, score)
4238            })
4239            .collect();
4240        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4241
4242        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4243        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4244        assert_eq!(updated_all.affected_rows, 5);
4245        let after = rt
4246            .execute_query("SELECT score FROM items ORDER BY id")
4247            .unwrap();
4248        let scores: Vec<i64> = after
4249            .result
4250            .records
4251            .iter()
4252            .map(|record| match record.get("score").unwrap() {
4253                Value::Integer(value) => *value,
4254                other => panic!("expected integer score, got {other:?}"),
4255            })
4256            .collect();
4257        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4258    }
4259
4260    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4261    /// both the index fast-path (WHERE id = N with a HASH index) and
4262    /// the unindexed scan path (WHERE score > N) to confirm the
4263    /// extracted "find target rows" loop preserves the affected-row
4264    /// count and which rows survive.
4265    #[test]
4266    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4267        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4268        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4269            .unwrap();
4270        for id in 0..5 {
4271            rt.execute_query(&format!(
4272                "INSERT INTO items (id, score) VALUES ({id}, {})",
4273                id * 10
4274            ))
4275            .unwrap();
4276        }
4277        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4278            .unwrap();
4279
4280        // Indexed equality DELETE — hits the hash fast-path inside
4281        // DmlTargetScan::find_target_ids.
4282        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4283        assert_eq!(deleted_one.affected_rows, 1);
4284
4285        // Unindexed scan DELETE — drops everyone with score > 25,
4286        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4287        // zoned/full-scan branch.
4288        let deleted_many = rt
4289            .execute_query("DELETE FROM items WHERE score > 25")
4290            .unwrap();
4291        assert_eq!(deleted_many.affected_rows, 2);
4292
4293        let surviving = rt
4294            .execute_query("SELECT id FROM items ORDER BY id")
4295            .unwrap();
4296        let ids: Vec<i64> = surviving
4297            .result
4298            .records
4299            .iter()
4300            .map(|record| match record.get("id").unwrap() {
4301                Value::Integer(value) => *value,
4302                other => panic!("expected integer id, got {other:?}"),
4303            })
4304            .collect();
4305        assert_eq!(ids, vec![0, 1]);
4306
4307        // Sanity: full-scan DELETE with no WHERE clears the rest.
4308        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4309        assert_eq!(deleted_rest.affected_rows, 2);
4310        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4311        assert!(empty.result.records.is_empty());
4312    }
4313
4314    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4315    /// INSERT but reject UPDATE and DELETE with the documented
4316    /// operator-facing error strings. Drives all three DML verbs so
4317    /// the centralized gate is exercised end-to-end.
4318    #[test]
4319    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4320        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4321        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4322            .unwrap();
4323
4324        // INSERT must succeed — APPEND ONLY exists precisely to allow
4325        // appends. The gate should be a no-op for INSERT.
4326        let inserted = rt
4327            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4328            .unwrap();
4329        assert_eq!(inserted.affected_rows, 1);
4330
4331        // UPDATE is rejected with the gate's UPDATE-specific message.
4332        let update_err = rt
4333            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4334            .unwrap_err();
4335        let msg = format!("{update_err}");
4336        assert!(
4337            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4338            "expected UPDATE rejection message, got: {msg}"
4339        );
4340
4341        // DELETE is rejected with the gate's DELETE-specific message.
4342        let delete_err = rt
4343            .execute_query("DELETE FROM events WHERE id = 1")
4344            .unwrap_err();
4345        let msg = format!("{delete_err}");
4346        assert!(
4347            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4348            "expected DELETE rejection message, got: {msg}"
4349        );
4350
4351        // Row should still be present — neither rejected mutation
4352        // touched storage.
4353        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4354        assert_eq!(surviving.result.records.len(), 1);
4355    }
4356
4357    /// CollectionContract gate: tables without an APPEND ONLY contract
4358    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4359    /// is a true pass-through, not an accidental block.
4360    #[test]
4361    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4362        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4363        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4364            .unwrap();
4365
4366        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4367            .unwrap();
4368        let updated = rt
4369            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4370            .unwrap();
4371        assert_eq!(updated.affected_rows, 1);
4372        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4373        assert_eq!(deleted.affected_rows, 1);
4374    }
4375
4376    #[test]
4377    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4378        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4379        rt.execute_query(
4380            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4381        )
4382        .unwrap();
4383
4384        let inserted = rt
4385            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4386            .unwrap();
4387        assert_eq!(inserted.affected_rows, 1);
4388
4389        let events = queue_payloads(&rt, "audit_log");
4390        assert_eq!(events.len(), 1);
4391        let event = events[0].as_object().expect("event payload object");
4392        assert!(event
4393            .get("event_id")
4394            .and_then(crate::json::Value::as_str)
4395            .is_some_and(|value| !value.is_empty()));
4396        assert_eq!(
4397            event.get("op").and_then(crate::json::Value::as_str),
4398            Some("insert")
4399        );
4400        assert_eq!(
4401            event.get("collection").and_then(crate::json::Value::as_str),
4402            Some("users")
4403        );
4404        assert_eq!(
4405            event.get("id").and_then(crate::json::Value::as_u64),
4406            Some(7)
4407        );
4408        assert!(event
4409            .get("ts")
4410            .and_then(crate::json::Value::as_u64)
4411            .is_some());
4412        assert!(event
4413            .get("lsn")
4414            .and_then(crate::json::Value::as_u64)
4415            .is_some());
4416        assert!(matches!(
4417            event.get("tenant"),
4418            Some(crate::json::Value::Null)
4419        ));
4420        assert!(matches!(
4421            event.get("before"),
4422            Some(crate::json::Value::Null)
4423        ));
4424        let after = event
4425            .get("after")
4426            .and_then(crate::json::Value::as_object)
4427            .expect("after object");
4428        assert_eq!(
4429            after.get("id").and_then(crate::json::Value::as_u64),
4430            Some(7)
4431        );
4432        assert_eq!(
4433            after.get("email").and_then(crate::json::Value::as_str),
4434            Some("a@example.com")
4435        );
4436    }
4437
4438    #[test]
4439    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4440        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4441        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4442            .unwrap();
4443
4444        rt.execute_query(
4445            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4446        )
4447        .unwrap();
4448
4449        let events = queue_payloads(&rt, "users_events");
4450        assert_eq!(events.len(), 2);
4451        let mut previous_lsn = 0;
4452        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4453            let object = event.as_object().expect("event payload object");
4454            assert_eq!(
4455                object.get("op").and_then(crate::json::Value::as_str),
4456                Some("insert")
4457            );
4458            assert_eq!(
4459                object.get("id").and_then(crate::json::Value::as_u64),
4460                Some(expected_id)
4461            );
4462            let lsn = object
4463                .get("lsn")
4464                .and_then(crate::json::Value::as_u64)
4465                .expect("event lsn");
4466            assert!(
4467                lsn > previous_lsn,
4468                "event LSNs should increase in row order"
4469            );
4470            previous_lsn = lsn;
4471            let after = object
4472                .get("after")
4473                .and_then(crate::json::Value::as_object)
4474                .expect("after object");
4475            assert_eq!(
4476                after.get("id").and_then(crate::json::Value::as_u64),
4477                Some(expected_id)
4478            );
4479        }
4480    }
4481
4482    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4483        let result = rt
4484            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4485            .expect("peek queue");
4486        result
4487            .result
4488            .records
4489            .iter()
4490            .map(
4491                |record| match record.get("payload").expect("payload column") {
4492                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4493                    other => panic!("expected JSON queue payload, got {other:?}"),
4494                },
4495            )
4496            .collect()
4497    }
4498
4499    // ── #112: auto-index user `id` on first insert ─────────────────────
4500
4501    /// First insert into a fresh collection that carries a column named
4502    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4503    /// populate it transparently, and `WHERE id = N` lookups exercise
4504    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4505    ///
4506    /// This is the load-bearing acceptance test for #112 — without the
4507    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4508    /// fall through to a full segment scan (the 4× perf gap documented
4509    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4510    #[test]
4511    fn auto_index_id_fires_on_first_insert() {
4512        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4513        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4514            .unwrap();
4515
4516        // Pre-condition: no index on `id` yet.
4517        assert!(
4518            rt.index_store_ref()
4519                .find_index_for_column("bench_users", "id")
4520                .is_none(),
4521            "freshly created collection should not have an `id` index"
4522        );
4523
4524        // Single-row INSERT — drives `MutationEngine::append_one`.
4525        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4526            .unwrap();
4527
4528        // Post-condition: hash index registered on `id`.
4529        let registered = rt
4530            .index_store_ref()
4531            .find_index_for_column("bench_users", "id")
4532            .expect("auto-index hook should have registered idx_id on first insert");
4533        assert_eq!(registered.name, "idx_id");
4534        assert_eq!(registered.collection, "bench_users");
4535        assert_eq!(registered.columns, vec!["id".to_string()]);
4536        assert!(matches!(
4537            registered.method,
4538            super::super::index_store::IndexMethodKind::Hash
4539        ));
4540
4541        // Subsequent inserts populate the index; `WHERE id = N` should
4542        // resolve via the hash fast path and round-trip every row.
4543        for id in 2..=5 {
4544            rt.execute_query(&format!(
4545                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4546                id * 10
4547            ))
4548            .unwrap();
4549        }
4550        for id in 1..=5 {
4551            let result = rt
4552                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4553                .unwrap();
4554            assert_eq!(
4555                result.result.records.len(),
4556                1,
4557                "id={id} should match one row"
4558            );
4559        }
4560
4561        // Delete via the hash fast-path — exactly the bench scenario the
4562        // perf doc identified as the 4× regression. With the index
4563        // present, `find_target_ids` short-circuits before
4564        // `for_each_entity_zoned` runs.
4565        let deleted = rt
4566            .execute_query("DELETE FROM bench_users WHERE id = 3")
4567            .unwrap();
4568        assert_eq!(deleted.affected_rows, 1);
4569    }
4570
4571    /// Bulk INSERT (the multi-row VALUES path) drives
4572    /// `MutationEngine::append_batch`. The hook must fire there too —
4573    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4574    /// wire bulk INSERT) skip auto-indexing entirely.
4575    #[test]
4576    fn auto_index_id_fires_on_first_bulk_insert() {
4577        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4578        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4579            .unwrap();
4580
4581        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4582            .unwrap();
4583
4584        let registered = rt
4585            .index_store_ref()
4586            .find_index_for_column("bench_bulk", "id")
4587            .expect("auto-index hook should fire on first bulk insert");
4588        assert_eq!(registered.name, "idx_id");
4589
4590        // Every row populated via `index_entity_insert_batch`.
4591        for id in 1..=3 {
4592            let result = rt
4593                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4594                .unwrap();
4595            assert_eq!(result.result.records.len(), 1);
4596        }
4597    }
4598
4599    /// Hook is a no-op when the row carries no `id` column. Conservative
4600    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4601    /// don't trigger it.
4602    #[test]
4603    fn auto_index_id_skips_when_no_id_column() {
4604        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4605        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4606            .unwrap();
4607        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4608            .unwrap();
4609
4610        assert!(rt
4611            .index_store_ref()
4612            .find_index_for_column("plain", "id")
4613            .is_none());
4614        assert!(rt
4615            .index_store_ref()
4616            .find_index_for_column("plain", "uid")
4617            .is_none());
4618    }
4619
4620    /// Hook only fires once per collection. If an explicit
4621    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4622    /// detects it via `find_index_for_column` and does NOT clobber it
4623    /// with a HASH index on the next insert.
4624    #[test]
4625    fn auto_index_id_skips_when_index_already_exists() {
4626        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4627        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4628            .unwrap();
4629        // User-declared BTREE index on `id` before any insert.
4630        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4631            .unwrap();
4632        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4633            .unwrap();
4634
4635        let registered = rt
4636            .index_store_ref()
4637            .find_index_for_column("pre", "id")
4638            .expect("user index should still be there");
4639        assert_eq!(
4640            registered.name, "user_idx",
4641            "auto-index hook must not overwrite an existing index"
4642        );
4643    }
4644
4645    /// Implicit `idx_id` is reaped when the collection drops. The
4646    /// existing `execute_drop_table` walks `list_indices` and drops every
4647    /// entry — confirm the auto-created index participates.
4648    #[test]
4649    fn auto_index_id_dropped_with_collection() {
4650        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4651        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4652            .unwrap();
4653        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4654            .unwrap();
4655        assert!(rt
4656            .index_store_ref()
4657            .find_index_for_column("ephemeral", "id")
4658            .is_some());
4659
4660        rt.execute_query("DROP TABLE ephemeral").unwrap();
4661
4662        assert!(
4663            rt.index_store_ref()
4664                .find_index_for_column("ephemeral", "id")
4665                .is_none(),
4666            "implicit `idx_id` must be reaped when its collection drops"
4667        );
4668    }
4669
4670    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4671    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4672    /// off, first insert leaves the collection without an `id` index —
4673    /// DELETE/UPDATE fall back to the scan path.
4674    #[test]
4675    fn auto_index_id_disabled_by_config() {
4676        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4677        let rt = RedDBRuntime::with_options(opts).unwrap();
4678
4679        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4680            .unwrap();
4681        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4682            .unwrap();
4683
4684        assert!(
4685            rt.index_store_ref()
4686                .find_index_for_column("off", "id")
4687                .is_none(),
4688            "with auto_index_id=false, no implicit index should be created"
4689        );
4690    }
4691
4692    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4693
4694    #[test]
4695    fn update_single_row_emits_update_event() {
4696        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4697        rt.execute_query(
4698            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4699        )
4700        .unwrap();
4701        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4702            .unwrap();
4703
4704        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4705            .unwrap();
4706
4707        let events = queue_payloads(&rt, "audit_log");
4708        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4709        let event = events[0].as_object().expect("event payload object");
4710        assert_eq!(
4711            event.get("op").and_then(crate::json::Value::as_str),
4712            Some("update")
4713        );
4714        assert_eq!(
4715            event.get("collection").and_then(crate::json::Value::as_str),
4716            Some("users")
4717        );
4718        assert!(event
4719            .get("event_id")
4720            .and_then(crate::json::Value::as_str)
4721            .is_some_and(|v| !v.is_empty()));
4722        let before = event
4723            .get("before")
4724            .and_then(crate::json::Value::as_object)
4725            .expect("before must be an object");
4726        let after = event
4727            .get("after")
4728            .and_then(crate::json::Value::as_object)
4729            .expect("after must be an object");
4730        assert_eq!(
4731            before.get("name").and_then(crate::json::Value::as_str),
4732            Some("Alice"),
4733            "before.name should be the old value"
4734        );
4735        assert_eq!(
4736            after.get("name").and_then(crate::json::Value::as_str),
4737            Some("Bob"),
4738            "after.name should be the new value"
4739        );
4740    }
4741
4742    #[test]
4743    fn update_event_only_includes_changed_fields() {
4744        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4745        rt.execute_query(
4746            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4747        )
4748        .unwrap();
4749        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4750            .unwrap();
4751
4752        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4753            .unwrap();
4754
4755        let events = queue_payloads(&rt, "evts");
4756        assert_eq!(events.len(), 1);
4757        let event = events[0].as_object().unwrap();
4758        let before = event
4759            .get("before")
4760            .and_then(crate::json::Value::as_object)
4761            .unwrap();
4762        let after = event
4763            .get("after")
4764            .and_then(crate::json::Value::as_object)
4765            .unwrap();
4766        // Only changed field included.
4767        assert!(
4768            before.contains_key("name"),
4769            "before must include changed field"
4770        );
4771        assert!(
4772            after.contains_key("name"),
4773            "after must include changed field"
4774        );
4775        // Unchanged fields must not appear.
4776        assert!(
4777            !before.contains_key("email"),
4778            "before must not include unchanged email"
4779        );
4780        assert!(
4781            !after.contains_key("email"),
4782            "after must not include unchanged email"
4783        );
4784    }
4785
4786    #[test]
4787    fn multi_row_update_emits_one_event_per_row() {
4788        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4789        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4790            .unwrap();
4791        rt.execute_query(
4792            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4793        )
4794        .unwrap();
4795
4796        rt.execute_query("UPDATE items SET status = 'done'")
4797            .unwrap();
4798
4799        let events = queue_payloads(&rt, "evts");
4800        assert_eq!(events.len(), 3, "expected one update event per row");
4801        for event in &events {
4802            let obj = event.as_object().unwrap();
4803            assert_eq!(
4804                obj.get("op").and_then(crate::json::Value::as_str),
4805                Some("update")
4806            );
4807        }
4808    }
4809
4810    #[test]
4811    fn delete_single_row_emits_delete_event() {
4812        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4813        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4814            .unwrap();
4815        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4816            .unwrap();
4817
4818        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4819
4820        let events = queue_payloads(&rt, "del_log");
4821        assert_eq!(events.len(), 1);
4822        let event = events[0].as_object().expect("event payload object");
4823        assert_eq!(
4824            event.get("op").and_then(crate::json::Value::as_str),
4825            Some("delete")
4826        );
4827        assert_eq!(
4828            event.get("collection").and_then(crate::json::Value::as_str),
4829            Some("users")
4830        );
4831        assert!(event
4832            .get("event_id")
4833            .and_then(crate::json::Value::as_str)
4834            .is_some_and(|v| !v.is_empty()));
4835        let before = event
4836            .get("before")
4837            .and_then(crate::json::Value::as_object)
4838            .expect("before must be an object for delete");
4839        assert_eq!(
4840            before.get("id").and_then(crate::json::Value::as_u64),
4841            Some(42)
4842        );
4843        assert_eq!(
4844            before.get("name").and_then(crate::json::Value::as_str),
4845            Some("Alice")
4846        );
4847        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4848    }
4849
4850    #[test]
4851    fn multi_row_delete_emits_one_event_per_row() {
4852        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4853        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4854            .unwrap();
4855        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4856            .unwrap();
4857
4858        rt.execute_query("DELETE FROM items").unwrap();
4859
4860        let events = queue_payloads(&rt, "del_log");
4861        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4862        for event in &events {
4863            let obj = event.as_object().unwrap();
4864            assert_eq!(
4865                obj.get("op").and_then(crate::json::Value::as_str),
4866                Some("delete")
4867            );
4868            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4869        }
4870    }
4871
4872    #[test]
4873    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4874        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4875        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4876            .unwrap();
4877
4878        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4879            .unwrap();
4880        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4881
4882        let events = queue_payloads(&rt, "evts");
4883        assert!(
4884            events.is_empty(),
4885            "UPDATE-only filter must not emit INSERT or DELETE events"
4886        );
4887    }
4888
4889    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
4890
4891    #[test]
4892    fn suppress_events_on_insert_emits_no_events() {
4893        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4894        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4895            .unwrap();
4896
4897        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4898            .unwrap();
4899
4900        let events = queue_payloads(&rt, "evts");
4901        assert!(
4902            events.is_empty(),
4903            "SUPPRESS EVENTS must prevent INSERT events"
4904        );
4905    }
4906
4907    #[test]
4908    fn suppress_events_on_update_emits_no_events() {
4909        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4910        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4911            .unwrap();
4912        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4913            .unwrap();
4914        // drain the INSERT event
4915        let _ = queue_payloads(&rt, "evts");
4916        // Force pop to drain; simpler: just check new count after UPDATE
4917        rt.execute_query("QUEUE PURGE evts").unwrap();
4918
4919        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4920            .unwrap();
4921
4922        let events = queue_payloads(&rt, "evts");
4923        assert!(
4924            events.is_empty(),
4925            "SUPPRESS EVENTS must prevent UPDATE events"
4926        );
4927    }
4928
4929    #[test]
4930    fn suppress_events_on_delete_emits_no_events() {
4931        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4932        rt.execute_query(
4933            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4934        )
4935        .unwrap();
4936        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4937            .unwrap();
4938
4939        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4940            .unwrap();
4941
4942        let events = queue_payloads(&rt, "evts");
4943        assert!(
4944            events.is_empty(),
4945            "SUPPRESS EVENTS must prevent DELETE events"
4946        );
4947    }
4948
4949    #[test]
4950    fn normal_insert_after_suppress_still_emits() {
4951        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4952        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4953            .unwrap();
4954
4955        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4956            .unwrap();
4957        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4958            .unwrap();
4959
4960        let events = queue_payloads(&rt, "evts");
4961        assert_eq!(
4962            events.len(),
4963            1,
4964            "only the non-suppressed INSERT should emit"
4965        );
4966        assert_eq!(
4967            events[0].get("id").and_then(crate::json::Value::as_u64),
4968            Some(2)
4969        );
4970    }
4971}