Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&*store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&*store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome =
100            crate::runtime::blockchain_kind::verify_chain_outcome(&*store, collection)?;
101        if !outcome.ok {
102            crate::runtime::blockchain_kind::persist_integrity_flag(&*store, collection, true);
103            self.inner
104                .chain_integrity_broken
105                .lock()
106                .insert(collection.to_string(), true);
107        }
108        Some(outcome)
109    }
110
111    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
112    /// accepts INSERTs again.  Returns `false` when the collection is not a
113    /// chain.
114    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
115        let store = self.inner.db.store();
116        if !crate::runtime::blockchain_kind::is_chain(&*store, collection) {
117            return false;
118        }
119        crate::runtime::blockchain_kind::persist_integrity_flag(&*store, collection, false);
120        self.inner
121            .chain_integrity_broken
122            .lock()
123            .insert(collection.to_string(), false);
124        true
125    }
126
127    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
128    /// with a one-time scan of `red_config` on cold start so the flag survives
129    /// restart.
130    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
131        {
132            let cache = self.inner.chain_integrity_broken.lock();
133            if let Some(v) = cache.get(collection) {
134                return *v;
135            }
136        }
137        let store = self.inner.db.store();
138        let persisted =
139            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&*store, collection)
140                .unwrap_or(false);
141        self.inner
142            .chain_integrity_broken
143            .lock()
144            .insert(collection.to_string(), persisted);
145        persisted
146    }
147
148    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
149    /// target table is tenant-scoped and the user's column list does
150    /// not already name the tenant column.
151    ///
152    /// Returns:
153    /// * `Ok(None)` — no injection needed (non-tenant table, or user
154    ///   supplied the column explicitly). Caller uses the original
155    ///   query unchanged.
156    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
157    ///   + literal value appended to every row.
158    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
159    ///   the current session. Fails loudly so callers don't produce
160    ///   rows that RLS would then hide on read.
161    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
162        let Some(tenant_col) = self.tenant_column(&query.table) else {
163            return Ok(None);
164        };
165        // User already named the column (literal match) — trust them.
166        if query
167            .columns
168            .iter()
169            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
170        {
171            return Ok(None);
172        }
173
174        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
175        // nested key like `headers.tenant` we operate on the root
176        // column (`headers`) and set / add the nested path inside its
177        // JSON value. If the user named the root column we mutate in
178        // place; otherwise we create a fresh JSON column for every row.
179        if let Some(dot_pos) = tenant_col.find('.') {
180            let (root, tail) = tenant_col.split_at(dot_pos);
181            let tail = &tail[1..]; // drop leading '.'
182            return self.inject_dotted_tenant(query, root, tail);
183        }
184
185        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
186            return Err(RedDBError::Query(format!(
187                "INSERT into tenant-scoped table '{}' requires an active tenant — \
188                 run SET TENANT '<id>' first or name column '{}' explicitly",
189                query.table, tenant_col
190            )));
191        };
192
193        let mut augmented = query.clone();
194        augmented.columns.push(tenant_col);
195        let lit = Value::text(tenant_id.clone());
196        for row in augmented.values.iter_mut() {
197            row.push(lit.clone());
198        }
199        for row in augmented.value_exprs.iter_mut() {
200            row.push(crate::storage::query::ast::Expr::Literal {
201                value: lit.clone(),
202                span: crate::storage::query::ast::Span::synthetic(),
203            });
204        }
205        Ok(Some(augmented))
206    }
207
208    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
209    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
210    /// nested JSON instead of appending a flat column.
211    ///
212    /// Cases:
213    /// * Root column already in the INSERT list → mutate per-row JSON
214    ///   (parse, set path, re-serialize).
215    /// * Root column absent → create a fresh `{tail: tenant}` JSON
216    ///   object and append the root column to the INSERT.
217    fn inject_dotted_tenant(
218        &self,
219        query: &InsertQuery,
220        root: &str,
221        tail: &str,
222    ) -> RedDBResult<Option<InsertQuery>> {
223        let active_tenant = crate::runtime::impl_core::current_tenant();
224        let mut augmented = query.clone();
225        let root_idx = augmented
226            .columns
227            .iter()
228            .position(|c| c.eq_ignore_ascii_case(root));
229
230        if let Some(idx) = root_idx {
231            // User supplied the root column. Per-row: if the dotted
232            // tail is already present we trust the user (admin / bulk
233            // loader scenario); otherwise fill from the active
234            // tenant. An unbound tenant is only an error when some
235            // row actually needs filling.
236            for row in augmented.values.iter_mut() {
237                let Some(slot) = row.get_mut(idx) else {
238                    continue;
239                };
240                if dotted_tail_already_set(slot, tail) {
241                    continue;
242                }
243                let Some(tenant_id) = &active_tenant else {
244                    return Err(RedDBError::Query(format!(
245                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
246                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
247                        query.table, root, tail
248                    )));
249                };
250                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
251            }
252            // Expression row is kept in sync by re-wrapping the
253            // mutated literal; the canonical path will re-evaluate
254            // against the same JSON shape.
255            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
256                if let Some(slot) = row.get_mut(idx) {
257                    let new_value = augmented
258                        .values
259                        .get(row_idx)
260                        .and_then(|v| v.get(idx))
261                        .cloned()
262                        .unwrap_or(Value::Null);
263                    *slot = crate::storage::query::ast::Expr::Literal {
264                        value: new_value,
265                        span: crate::storage::query::ast::Span::synthetic(),
266                    };
267                }
268            }
269        } else {
270            // No root column in the INSERT list — auto-fill needs a
271            // bound tenant to synthesise one. Error loud so we never
272            // create a tenant-less row that RLS would then hide.
273            let Some(tenant_id) = &active_tenant else {
274                return Err(RedDBError::Query(format!(
275                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
276                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
277                    query.table, root, tail
278                )));
279            };
280            // Create a fresh JSON column with only the tenant path set.
281            augmented.columns.push(root.to_string());
282            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
283            for row in augmented.values.iter_mut() {
284                row.push(fresh.clone());
285            }
286            for row in augmented.value_exprs.iter_mut() {
287                row.push(crate::storage::query::ast::Expr::Literal {
288                    value: fresh.clone(),
289                    span: crate::storage::query::ast::Span::synthetic(),
290                });
291            }
292        }
293
294        Ok(Some(augmented))
295    }
296
297    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
298    /// `lsns` is empty because events fire at commit time.
299    fn delete_entities_batch(
300        &self,
301        collection: &str,
302        ids: &[EntityId],
303    ) -> RedDBResult<(u64, Vec<u64>)> {
304        if ids.is_empty() {
305            return Ok((0, vec![]));
306        }
307
308        let store = self.db().store();
309        let Some(manager) = store.get_collection(collection) else {
310            return Ok((0, vec![]));
311        };
312
313        let active_xid = self.current_xid();
314        let conn_id = crate::runtime::impl_core::current_connection_id();
315        let mut autocommit_xid = None;
316        let mut tombstoned_ids = Vec::new();
317        let mut tombstoned_entities = Vec::new();
318        let mut physical_delete_ids = Vec::new();
319        let table_row_resolver =
320            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
321
322        for &id in ids {
323            let Some(mut entity) = manager.get(id) else {
324                continue;
325            };
326            if matches!(entity.data, EntityData::Row(_)) {
327                let previous_xmax = entity.xmax;
328                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
329                    if table_row_resolver.resolve_candidate(&entity).is_none() {
330                        continue;
331                    }
332                } else if entity.xmax != 0 {
333                    continue;
334                }
335
336                let xid = match active_xid {
337                    Some(xid) => xid,
338                    None => match autocommit_xid {
339                        Some(xid) => xid,
340                        None => {
341                            let mgr = self.snapshot_manager();
342                            let xid = mgr.begin();
343                            autocommit_xid = Some(xid);
344                            xid
345                        }
346                    },
347                };
348                entity.set_xmax(xid);
349                if manager.update(entity.clone()).is_ok() {
350                    if active_xid.is_some() {
351                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
352                    }
353                    tombstoned_entities.push(entity);
354                    tombstoned_ids.push(id);
355                }
356            } else {
357                physical_delete_ids.push(id);
358            }
359        }
360
361        if let Some(xid) = autocommit_xid {
362            self.snapshot_manager().commit(xid);
363        }
364
365        let mut affected = tombstoned_ids.len() as u64;
366        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
367        if active_xid.is_some() {
368            store
369                .persist_entities_to_pager(collection, &tombstoned_entities)
370                .map_err(|err| RedDBError::Internal(err.to_string()))?;
371        } else {
372            store
373                .persist_entities_to_pager(collection, &tombstoned_entities)
374                .map_err(|err| RedDBError::Internal(err.to_string()))?;
375            for id in &tombstoned_ids {
376                store.context_index().remove_entity(*id);
377                let lsn = self.cdc_emit(
378                    crate::replication::cdc::ChangeOperation::Delete,
379                    collection,
380                    id.raw(),
381                    "entity",
382                );
383                lsns.push(lsn);
384            }
385        }
386
387        let deleted_ids = store
388            .delete_batch(collection, &physical_delete_ids)
389            .map_err(|err| RedDBError::Internal(err.to_string()))?;
390        affected += deleted_ids.len() as u64;
391        for id in &deleted_ids {
392            store.context_index().remove_entity(*id);
393            let lsn = self.cdc_emit(
394                crate::replication::cdc::ChangeOperation::Delete,
395                collection,
396                id.raw(),
397                "entity",
398            );
399            lsns.push(lsn);
400        }
401
402        Ok((affected, lsns))
403    }
404
405    /// Flushes context-index updates and CDC for each applied mutation.
406    /// Returns one LSN per entity in the same order as `applied`.
407    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
408        if applied.is_empty() {
409            return Ok(Vec::new());
410        }
411
412        let store = self.db().store();
413        if applied.iter().any(|item| item.context_index_dirty) {
414            store.context_index().index_entities(
415                &applied[0].collection,
416                applied
417                    .iter()
418                    .filter(|item| item.context_index_dirty)
419                    .map(|item| &item.entity),
420            );
421        }
422
423        for item in applied {
424            self.refresh_update_secondary_indexes(item)?;
425        }
426
427        let mut lsns = Vec::with_capacity(applied.len());
428        for item in applied {
429            let lsn = self.cdc_emit_prebuilt(
430                crate::replication::cdc::ChangeOperation::Update,
431                &item.collection,
432                &item.entity,
433                update_cdc_item_kind(self, &item.collection, &item.entity),
434                item.metadata.as_ref(),
435                false,
436            );
437            lsns.push(lsn);
438        }
439        Ok(lsns)
440    }
441
442    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
443        self.persist_applied_entity_mutations(applied)
444    }
445
446    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
447        if applied.pre_mutation_fields.is_empty() {
448            return Ok(());
449        }
450        let post = entity_row_fields_snapshot(&applied.entity);
451        if post.is_empty() {
452            return Ok(());
453        }
454
455        let indexed_cols = self
456            .index_store_ref()
457            .indexed_columns_set(&applied.collection);
458        if indexed_cols.is_empty() {
459            return Ok(());
460        }
461
462        if let Some(old_version) = applied.replaced_entity.as_ref() {
463            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
464                .pre_mutation_fields
465                .iter()
466                .filter(|(col, _)| indexed_cols.contains(col))
467                .cloned()
468                .collect();
469            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
470                .iter()
471                .filter(|(col, _)| indexed_cols.contains(col))
472                .cloned()
473                .collect();
474            if !old_index_fields.is_empty() {
475                self.index_store_ref()
476                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
477                    .map_err(crate::RedDBError::Internal)?;
478            }
479            if !new_index_fields.is_empty() {
480                self.index_store_ref()
481                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
482                    .map_err(crate::RedDBError::Internal)?;
483            }
484            return Ok(());
485        }
486
487        let damage =
488            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
489        if damage
490            .touched_columns()
491            .into_iter()
492            .any(|col| indexed_cols.contains(col))
493        {
494            self.index_store_ref()
495                .index_entity_update(
496                    &applied.collection,
497                    applied.id,
498                    &applied.pre_mutation_fields,
499                    &post,
500                )
501                .map_err(crate::RedDBError::Internal)?;
502        }
503        Ok(())
504    }
505
506    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
507    ///
508    /// Each row in `query.values` is zipped with `query.columns` to produce a
509    /// set of named fields, which is then dispatched based on entity_type.
510    pub fn execute_insert(
511        &self,
512        raw_query: &str,
513        query: &InsertQuery,
514    ) -> RedDBResult<RuntimeQueryResult> {
515        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
516        // CollectionContract gate (#49): single entry point for the
517        // operator's collection-level write rules. Today this is a
518        // no-op for INSERT (APPEND ONLY permits insert); routing
519        // through the gate now means future contract bits — versioned,
520        // vault-only writes — plug in once instead of per verb.
521        crate::runtime::collection_contract::CollectionContractGate::check(
522            self,
523            &query.table,
524            crate::runtime::collection_contract::MutationKind::Insert,
525        )?;
526        // Phase 2.5.4 table-scoped tenancy: if the target table is
527        // tenant-scoped and the user didn't name the tenant column,
528        // auto-inject it with the thread-local `CURRENT_TENANT()`
529        // value. When the column is named explicitly we trust the
530        // caller (useful for admin tooling that writes on behalf of
531        // specific tenants). An unbound tenant on an implicit-fill
532        // path errors up front rather than producing a row the RLS
533        // policy would silently hide.
534        let augmented_owned;
535        let query = match self.maybe_inject_tenant_column(query)? {
536            Some(new_q) => {
537                augmented_owned = new_q;
538                &augmented_owned
539            }
540            None => query,
541        };
542        self.check_insert_column_policy(query)?;
543
544        let mut inserted_count: u64 = 0;
545        let effective_rows =
546            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
547
548        // Ensure the collection exists (auto-create on first insert).
549        let store = self.inner.db.store();
550        let _ = store.get_or_create_collection(&query.table);
551        let declared_model = self
552            .db()
553            .collection_contract_arc(&query.table)
554            .map(|contract| contract.declared_model);
555
556        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
557            if query.returning.is_some() {
558                Some(Vec::with_capacity(effective_rows.len()))
559            } else {
560                None
561            };
562        let mut returning_result: Option<UnifiedResult> = None;
563
564        if matches!(query.entity_type, InsertEntityType::Row)
565            && !matches!(
566                declared_model,
567                Some(crate::catalog::CollectionModel::TimeSeries)
568            )
569        {
570            // Issue #523 + #524: blockchain collections seal each row into the
571            // chain. When the caller omits the reserved columns, the engine
572            // auto-fills (#523). When the caller supplies any reserved column,
573            // the values are validated against the current tip and a mismatch
574            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
575            //
576            // The whole batch runs under a per-collection chain lock so two
577            // concurrent submitters can't both bind to the same prev_hash —
578            // the loser observes the advanced tip and gets 409 with the new
579            // tip so it can retry.
580            let chain_mode = crate::runtime::blockchain_kind::is_chain(&*store, &query.table);
581            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
582                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
583            } else {
584                None
585            };
586            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
587
588            // Issue #525 — refuse new blocks if the chain has been marked
589            // `integrity = broken` until an admin clears the flag.
590            if chain_mode && self.is_chain_integrity_broken(&query.table) {
591                return Err(RedDBError::InvalidOperation(format!(
592                    "ChainIntegrityBroken: collection '{}' is locked until \
593                     POST /collections/{}/clear-integrity-flag is called by an admin",
594                    query.table, query.table
595                )));
596            }
597
598            // Pull the tip from the in-memory cache; fall back to a one-time
599            // scan if the cache hasn't seen this collection yet (cold start
600            // after restart). Cache is updated below as rows are sealed.
601            let mut chain_tip_full: Option<
602                crate::runtime::blockchain_kind::ChainTipFull,
603            > = if chain_mode {
604                let mut cache = self.inner.chain_tip_cache.lock();
605                if let Some(existing) = cache.get(&query.table) {
606                    Some(existing.clone())
607                } else if let Some(scanned) =
608                    crate::runtime::blockchain_kind::chain_tip_full(&*store, &query.table)
609                {
610                    cache.insert(query.table.clone(), scanned.clone());
611                    Some(scanned)
612                } else {
613                    None
614                }
615            } else {
616                None
617            };
618
619            let mut rows = Vec::with_capacity(effective_rows.len());
620            for row_values in &effective_rows {
621                if row_values.len() != query.columns.len() {
622                    return Err(RedDBError::Query(format!(
623                        "INSERT column count ({}) does not match value count ({})",
624                        query.columns.len(),
625                        row_values.len()
626                    )));
627                }
628                let (mut fields, mut metadata) =
629                    split_insert_metadata(self, &query.columns, row_values)?;
630                if chain_mode {
631                    use crate::runtime::blockchain_kind::{
632                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
633                        COL_TIMESTAMP, RESERVED_COLUMNS,
634                    };
635                    let supplied_height = fields
636                        .iter()
637                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
638                        .map(|(_, v)| v.clone());
639                    let supplied_prev = fields
640                        .iter()
641                        .find(|(k, _)| k == COL_PREV_HASH)
642                        .map(|(_, v)| v.clone());
643                    let supplied_ts = fields
644                        .iter()
645                        .find(|(k, _)| k == COL_TIMESTAMP)
646                        .map(|(_, v)| v.clone());
647                    let supplied_hash =
648                        fields.iter().any(|(k, _)| k == COL_HASH);
649                    let user_supplied_any = supplied_height.is_some()
650                        || supplied_prev.is_some()
651                        || supplied_ts.is_some()
652                        || supplied_hash;
653
654                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
655                    let payload =
656                        crate::runtime::blockchain_kind::canonical_payload(&fields);
657
658                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
659                        Some(t) => (t.hash, t.height + 1),
660                        None => (
661                            crate::storage::blockchain::GENESIS_PREV_HASH,
662                            0u64,
663                        ),
664                    };
665                    let server_now = crate::runtime::blockchain_kind::now_ms();
666
667                    let (use_prev, use_height, use_ts) = if user_supplied_any {
668                        // Caller is participating in the chain protocol —
669                        // every field must be supplied AND match the tip.
670                        if supplied_hash {
671                            return Err(chain_conflict_error(
672                                tip_next_height.saturating_sub(1),
673                                tip_prev_hash,
674                                chain_tip_full
675                                    .as_ref()
676                                    .map(|t| t.timestamp_ms)
677                                    .unwrap_or(0),
678                                server_now,
679                                "hash column is engine-computed and cannot be supplied",
680                            ));
681                        }
682                        let caller_prev = match &supplied_prev {
683                            Some(Value::Blob(b)) if b.len() == 32 => {
684                                let mut a = [0u8; 32];
685                                a.copy_from_slice(b);
686                                a
687                            }
688                            Some(Value::Text(s)) if s.len() == 64 => {
689                                // Accept hex-encoded prev_hash so JSON / SQL
690                                // callers without literal-blob syntax can
691                                // still participate in the chain protocol.
692                                let mut a = [0u8; 32];
693                                let mut ok = true;
694                                for i in 0..32 {
695                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
696                                    match u8::from_str_radix(pair, 16) {
697                                        Ok(byte) => a[i] = byte,
698                                        Err(_) => {
699                                            ok = false;
700                                            break;
701                                        }
702                                    }
703                                }
704                                if !ok {
705                                    return Err(chain_conflict_error(
706                                        tip_next_height.saturating_sub(1),
707                                        tip_prev_hash,
708                                        chain_tip_full
709                                            .as_ref()
710                                            .map(|t| t.timestamp_ms)
711                                            .unwrap_or(0),
712                                        server_now,
713                                        "prev_hash is not valid hex",
714                                    ));
715                                }
716                                a
717                            }
718                            _ => {
719                                return Err(chain_conflict_error(
720                                    tip_next_height.saturating_sub(1),
721                                    tip_prev_hash,
722                                    chain_tip_full
723                                        .as_ref()
724                                        .map(|t| t.timestamp_ms)
725                                        .unwrap_or(0),
726                                    server_now,
727                                    "prev_hash missing or not a 32-byte Blob",
728                                ));
729                            }
730                        };
731                        if caller_prev != tip_prev_hash {
732                            return Err(chain_conflict_error(
733                                tip_next_height.saturating_sub(1),
734                                tip_prev_hash,
735                                chain_tip_full
736                                    .as_ref()
737                                    .map(|t| t.timestamp_ms)
738                                    .unwrap_or(0),
739                                server_now,
740                                "prev_hash does not match current tip",
741                            ));
742                        }
743                        let caller_height = match &supplied_height {
744                            Some(Value::UnsignedInteger(v)) => *v,
745                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
746                            _ => {
747                                return Err(chain_conflict_error(
748                                    tip_next_height.saturating_sub(1),
749                                    tip_prev_hash,
750                                    chain_tip_full
751                                        .as_ref()
752                                        .map(|t| t.timestamp_ms)
753                                        .unwrap_or(0),
754                                    server_now,
755                                    "block_height missing or not an unsigned integer",
756                                ));
757                            }
758                        };
759                        if caller_height != tip_next_height {
760                            return Err(chain_conflict_error(
761                                tip_next_height.saturating_sub(1),
762                                tip_prev_hash,
763                                chain_tip_full
764                                    .as_ref()
765                                    .map(|t| t.timestamp_ms)
766                                    .unwrap_or(0),
767                                server_now,
768                                "block_height does not match tip+1",
769                            ));
770                        }
771                        let caller_ts = match &supplied_ts {
772                            Some(Value::UnsignedInteger(v)) => *v,
773                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
774                            _ => {
775                                return Err(chain_conflict_error(
776                                    tip_next_height.saturating_sub(1),
777                                    tip_prev_hash,
778                                    chain_tip_full
779                                        .as_ref()
780                                        .map(|t| t.timestamp_ms)
781                                        .unwrap_or(0),
782                                    server_now,
783                                    "timestamp missing or not an unsigned integer",
784                                ));
785                            }
786                        };
787                        let drift = (caller_ts as i128) - (server_now as i128);
788                        if drift.abs() > 60_000 {
789                            return Err(chain_conflict_error(
790                                tip_next_height.saturating_sub(1),
791                                tip_prev_hash,
792                                chain_tip_full
793                                    .as_ref()
794                                    .map(|t| t.timestamp_ms)
795                                    .unwrap_or(0),
796                                server_now,
797                                "timestamp outside ±60s of server_time",
798                            ));
799                        }
800                        (caller_prev, caller_height, caller_ts)
801                    } else {
802                        (tip_prev_hash, tip_next_height, server_now)
803                    };
804
805                    let (reserved, new_hash) =
806                        crate::runtime::blockchain_kind::make_block_reserved_fields(
807                            use_prev, use_height, use_ts, &payload,
808                        );
809                    fields.extend(reserved);
810                    chain_tip_full =
811                        Some(crate::runtime::blockchain_kind::ChainTipFull {
812                            height: use_height,
813                            hash: new_hash,
814                            timestamp_ms: use_ts,
815                        });
816                }
817                // Issue #522 — signed-writes verification. On collections
818                // created with `SIGNED_BY (...)` the row must carry valid
819                // `signer_pubkey` + `signature` reserved columns. Runs
820                // after chain_mode so canonical payload covers user-supplied
821                // fields only (blockchain reserved columns are filtered by
822                // `canonical_payload`; the two signed-writes reserved
823                // columns are split out before payload computation, then
824                // re-attached for storage). The blockchain + SIGNED_BY
825                // composition is owned by issue #526; we keep #522 to the
826                // non-chain path and let chain_mode collections punt to that
827                // slice rather than half-wire it here.
828                if crate::runtime::signed_writes_kind::is_signed(&*store, &query.table)
829                {
830                    let (pk_col, sig_col, residual) =
831                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
832                    let payload =
833                        crate::runtime::blockchain_kind::canonical_payload(&residual);
834                    let reg = crate::runtime::signed_writes_kind::registry(
835                        &*store,
836                        &query.table,
837                    );
838                    crate::runtime::signed_writes_kind::verify_row(
839                        &reg,
840                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
841                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
842                        &payload,
843                    )
844                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
845                    fields = residual;
846                    // Round-trip the reserved columns with the value
847                    // type the caller supplied (Text/hex on the SQL path,
848                    // Blob on the binary path). Keeps SELECT and WHERE
849                    // predicates symmetric with the INSERT shape.
850                    if let Some(col) = pk_col {
851                        fields.push((
852                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL
853                                .to_string(),
854                            col.raw_value,
855                        ));
856                    }
857                    if let Some(col) = sig_col {
858                        fields.push((
859                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL
860                                .to_string(),
861                            col.raw_value,
862                        ));
863                    }
864                }
865                merge_with_clauses(
866                    &mut metadata,
867                    query.ttl_ms,
868                    query.expires_at_ms,
869                    &query.with_metadata,
870                );
871                if let Some(snaps) = returning_snapshots.as_mut() {
872                    snaps.push(fields.clone());
873                }
874                rows.push(CreateRowInput {
875                    collection: query.table.clone(),
876                    fields,
877                    metadata,
878                    node_links: Vec::new(),
879                    vector_links: Vec::new(),
880                });
881            }
882            let outputs = self.create_rows_batch(CreateRowsBatchInput {
883                collection: query.table.clone(),
884                rows,
885                suppress_events: query.suppress_events,
886            })?;
887            inserted_count = outputs.len() as u64;
888
889            // Chain mode: commit the new tip to the in-memory cache only after
890            // the batch persisted successfully. If the batch threw mid-way the
891            // cache stays on the previous tip and the chain lock releases.
892            if chain_mode {
893                if let Some(new_tip) = chain_tip_full.as_ref() {
894                    self.inner
895                        .chain_tip_cache
896                        .lock()
897                        .insert(query.table.clone(), new_tip.clone());
898                }
899            }
900
901            // Hypertable chunk routing: if this table was declared via
902            // CREATE HYPERTABLE, register each row's time-column value
903            // with the registry so chunk metadata (bounds, row counts,
904            // TTL eligibility) stays current. This is what lets
905            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
906            // retention daemon sweep expired chunks without scanning
907            // every row.
908            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
909                let time_col = &spec.time_column;
910                // Find the column's index in the INSERT column list.
911                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
912                    for row in &effective_rows {
913                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
914                            if *n >= 0 {
915                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
916                            }
917                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
918                            let _ = self.inner.db.hypertables().route(&query.table, *n);
919                        }
920                    }
921                }
922            }
923
924            if let (Some(items), Some(snaps)) =
925                (query.returning.as_ref(), returning_snapshots.take())
926            {
927                let snaps = row_insert_returning_snapshots(&outputs, snaps);
928                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
929            }
930        } else {
931            // Issue #419: surface the inserted entity id on every INSERT path.
932            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
933            // so a RETURNING clause (and the unconditional inserted_ids list,
934            // below) can expose the engine-assigned id. TimeSeries (the row
935            // branch in this else) still returns the not-supported error
936            // because create_timeseries_point isn't plumbed through this fn.
937            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
938                Vec::with_capacity(effective_rows.len());
939            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
940            {
941                Vec::with_capacity(effective_rows.len())
942            } else {
943                Vec::new()
944            };
945            if matches!(
946                query.entity_type,
947                InsertEntityType::Node | InsertEntityType::Edge
948            ) {
949                enum PreparedGraphInsert {
950                    Node {
951                        fields: Vec<(String, Value)>,
952                        input: CreateNodeInput,
953                    },
954                    Edge {
955                        fields: Vec<(String, Value)>,
956                        input: CreateEdgeInput,
957                    },
958                }
959
960                let mut prepared = Vec::with_capacity(effective_rows.len());
961                for row_values in &effective_rows {
962                    if row_values.len() != query.columns.len() {
963                        return Err(RedDBError::Query(format!(
964                            "INSERT column count ({}) does not match value count ({})",
965                            query.columns.len(),
966                            row_values.len()
967                        )));
968                    }
969
970                    match query.entity_type {
971                        InsertEntityType::Node => {
972                            let (node_values, mut metadata) =
973                                split_insert_metadata(self, &query.columns, row_values)?;
974                            merge_with_clauses(
975                                &mut metadata,
976                                query.ttl_ms,
977                                query.expires_at_ms,
978                                &query.with_metadata,
979                            );
980                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
981                            apply_collection_default_ttl_metadata(
982                                self,
983                                &query.table,
984                                &mut metadata,
985                            );
986                            let (columns, values) = pairwise_columns_values(&node_values);
987                            let label = find_column_value_string(&columns, &values, "label")?;
988                            let node_type =
989                                find_column_value_opt_string(&columns, &values, "node_type");
990                            let properties = extract_remaining_properties(
991                                &columns,
992                                &values,
993                                &["label", "node_type"],
994                            );
995                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
996                                properties.iter().map(|(key, _)| key.as_str()),
997                                &format!("node '{}'", query.table),
998                            )?;
999                            prepared.push(PreparedGraphInsert::Node {
1000                                fields: node_values,
1001                                input: CreateNodeInput {
1002                                    collection: query.table.clone(),
1003                                    label,
1004                                    node_type,
1005                                    properties,
1006                                    metadata,
1007                                    embeddings: Vec::new(),
1008                                    table_links: Vec::new(),
1009                                    node_links: Vec::new(),
1010                                },
1011                            });
1012                        }
1013                        InsertEntityType::Edge => {
1014                            let (edge_values, mut metadata) =
1015                                split_insert_metadata(self, &query.columns, row_values)?;
1016                            merge_with_clauses(
1017                                &mut metadata,
1018                                query.ttl_ms,
1019                                query.expires_at_ms,
1020                                &query.with_metadata,
1021                            );
1022                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1023                            apply_collection_default_ttl_metadata(
1024                                self,
1025                                &query.table,
1026                                &mut metadata,
1027                            );
1028                            let (columns, values) = pairwise_columns_values(&edge_values);
1029                            let label = find_column_value_string(&columns, &values, "label")?;
1030                            ensure_non_tree_structural_edge_label(&label)?;
1031                            let from_id = resolve_edge_endpoint_any(
1032                                self.inner.db.store().as_ref(),
1033                                &query.table,
1034                                &columns,
1035                                &values,
1036                                &["from_rid", "from"],
1037                            )?;
1038                            let to_id = resolve_edge_endpoint_any(
1039                                self.inner.db.store().as_ref(),
1040                                &query.table,
1041                                &columns,
1042                                &values,
1043                                &["to_rid", "to"],
1044                            )?;
1045                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1046                            let properties = extract_remaining_properties(
1047                                &columns,
1048                                &values,
1049                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1050                            );
1051                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1052                                properties.iter().map(|(key, _)| key.as_str()),
1053                                &format!("edge '{}'", query.table),
1054                            )?;
1055                            prepared.push(PreparedGraphInsert::Edge {
1056                                fields: edge_values,
1057                                input: CreateEdgeInput {
1058                                    collection: query.table.clone(),
1059                                    label,
1060                                    from: EntityId::new(from_id),
1061                                    to: EntityId::new(to_id),
1062                                    weight,
1063                                    properties,
1064                                    metadata,
1065                                },
1066                            });
1067                        }
1068                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1069                    }
1070                }
1071
1072                ensure_graph_insert_contract(self, &query.table)?;
1073                let mut batch = self.inner.db.batch();
1074                for item in prepared {
1075                    match item {
1076                        PreparedGraphInsert::Node { fields, input } => {
1077                            if query.returning.is_some() {
1078                                returning_field_snaps.push(fields);
1079                            }
1080                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1081                            batch = batch.add_node_with_type(
1082                                input.collection,
1083                                input.label,
1084                                node_type,
1085                                input.properties.into_iter().collect(),
1086                                input.metadata.into_iter().collect(),
1087                            );
1088                        }
1089                        PreparedGraphInsert::Edge { fields, input } => {
1090                            if query.returning.is_some() {
1091                                returning_field_snaps.push(fields);
1092                            }
1093                            batch = batch.add_edge(
1094                                input.collection,
1095                                input.label,
1096                                input.from,
1097                                input.to,
1098                                input.weight.unwrap_or(1.0),
1099                                input.properties.into_iter().collect(),
1100                                input.metadata.into_iter().collect(),
1101                            );
1102                        }
1103                    }
1104                }
1105                let batch_result = batch
1106                    .execute()
1107                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1108                let (ids, entity_kind) = match query.entity_type {
1109                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1110                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1111                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1112                };
1113                for id in &ids {
1114                    self.stamp_xmin_if_in_txn(&query.table, *id);
1115                }
1116                if query.returning.is_some() {
1117                    returning_field_snaps = graph_insert_returning_snapshots(
1118                        self.inner.db.store().as_ref(),
1119                        &query.table,
1120                        &ids,
1121                    );
1122                }
1123                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1124                let store = self.inner.db.store();
1125                entity_outputs.extend(ids.iter().map(|id| {
1126                    crate::application::entity::CreateEntityOutput {
1127                        id: *id,
1128                        entity: store.get(&query.table, *id),
1129                    }
1130                }));
1131                inserted_count = ids.len() as u64;
1132            } else {
1133                for row_values in &effective_rows {
1134                    if row_values.len() != query.columns.len() {
1135                        return Err(RedDBError::Query(format!(
1136                            "INSERT column count ({}) does not match value count ({})",
1137                            query.columns.len(),
1138                            row_values.len()
1139                        )));
1140                    }
1141
1142                    match query.entity_type {
1143                        InsertEntityType::Row => {
1144                            if query.returning.is_some() {
1145                                return Err(RedDBError::Query(
1146                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1147                                    .to_string(),
1148                            ));
1149                            }
1150                            let (fields, mut metadata) =
1151                                split_insert_metadata(self, &query.columns, row_values)?;
1152                            merge_with_clauses(
1153                                &mut metadata,
1154                                query.ttl_ms,
1155                                query.expires_at_ms,
1156                                &query.with_metadata,
1157                            );
1158                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1159                        }
1160                        InsertEntityType::Node | InsertEntityType::Edge => {
1161                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1162                        }
1163                        InsertEntityType::Vector => {
1164                            let (vector_values, mut metadata) =
1165                                split_insert_metadata(self, &query.columns, row_values)?;
1166                            merge_with_clauses(
1167                                &mut metadata,
1168                                query.ttl_ms,
1169                                query.expires_at_ms,
1170                                &query.with_metadata,
1171                            );
1172                            let (columns, values) = pairwise_columns_values(&vector_values);
1173                            let dense = find_column_value_vec_f32_any(
1174                                &columns,
1175                                &values,
1176                                &["dense", "embedding"],
1177                            )?;
1178                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1179                            let content =
1180                                find_column_value_opt_string(&columns, &values, "content");
1181                            if query.returning.is_some() {
1182                                returning_field_snaps.push(vector_values.clone());
1183                            }
1184                            let input = CreateVectorInput {
1185                                collection: query.table.clone(),
1186                                dense,
1187                                content,
1188                                metadata,
1189                                link_row: None,
1190                                link_node: None,
1191                            };
1192                            entity_outputs.push(self.create_vector(input)?);
1193                        }
1194                        InsertEntityType::Document => {
1195                            let (document_values, mut metadata) =
1196                                split_insert_metadata(self, &query.columns, row_values)?;
1197                            merge_with_clauses(
1198                                &mut metadata,
1199                                query.ttl_ms,
1200                                query.expires_at_ms,
1201                                &query.with_metadata,
1202                            );
1203                            let (columns, values) = pairwise_columns_values(&document_values);
1204                            let body_str = find_column_value_string(&columns, &values, "body")?;
1205                            let body: crate::json::Value = crate::json::from_str(&body_str)
1206                                .map_err(|e| {
1207                                    RedDBError::Query(format!("invalid JSON body: {e}"))
1208                                })?;
1209                            let input = CreateDocumentInput {
1210                                collection: query.table.clone(),
1211                                body,
1212                                metadata,
1213                                node_links: Vec::new(),
1214                                vector_links: Vec::new(),
1215                            };
1216                            let output = self.create_document(input)?;
1217                            if query.returning.is_some() {
1218                                let fields = output
1219                                    .entity
1220                                    .as_ref()
1221                                    .map(entity_row_fields_snapshot)
1222                                    .filter(|fields| !fields.is_empty())
1223                                    .unwrap_or(document_values);
1224                                returning_field_snaps.push(fields);
1225                            }
1226                            entity_outputs.push(output);
1227                        }
1228                        InsertEntityType::Kv => {
1229                            let (kv_values, mut metadata) =
1230                                split_insert_metadata(self, &query.columns, row_values)?;
1231                            merge_with_clauses(
1232                                &mut metadata,
1233                                query.ttl_ms,
1234                                query.expires_at_ms,
1235                                &query.with_metadata,
1236                            );
1237                            let (columns, values) = pairwise_columns_values(&kv_values);
1238                            let key = find_column_value_string(&columns, &values, "key")?;
1239                            let value = find_column_value(&columns, &values, "value")?;
1240                            if query.returning.is_some() {
1241                                returning_field_snaps.push(kv_values.clone());
1242                            }
1243                            let input = CreateKvInput {
1244                                collection: query.table.clone(),
1245                                key,
1246                                value,
1247                                metadata,
1248                            };
1249                            entity_outputs.push(self.create_kv(input)?);
1250                        }
1251                    }
1252
1253                    inserted_count += 1;
1254                }
1255            }
1256
1257            if let Some(items) = query.returning.as_ref() {
1258                if !entity_outputs.is_empty() {
1259                    returning_result = Some(build_returning_result(
1260                        items,
1261                        &returning_field_snaps,
1262                        Some(&entity_outputs),
1263                    ));
1264                }
1265            }
1266        }
1267
1268        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1269        if let Some(ref embed_config) = query.auto_embed {
1270            let store = self.inner.db.store();
1271            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1272            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
1273            let model = embed_config.model.clone().unwrap_or_else(|| {
1274                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1275                    .ok()
1276                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1277            });
1278
1279            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1280            let manager = store
1281                .get_collection(&query.table)
1282                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1283            let entities = manager.query_all(|_| true);
1284            let recent: Vec<_> = entities
1285                .into_iter()
1286                .rev()
1287                .take(effective_rows.len())
1288                .collect();
1289
1290            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1291            let entity_combos: Vec<(usize, String)> = recent
1292                .iter()
1293                .enumerate()
1294                .filter_map(|(i, entity)| {
1295                    if let EntityData::Row(ref row) = entity.data {
1296                        if let Some(ref named) = row.named {
1297                            let texts: Vec<String> = embed_config
1298                                .fields
1299                                .iter()
1300                                .filter_map(|field| match named.get(field) {
1301                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1302                                    _ => None,
1303                                })
1304                                .collect();
1305                            if !texts.is_empty() {
1306                                return Some((i, texts.join(" ")));
1307                            }
1308                        }
1309                    }
1310                    None
1311                })
1312                .collect();
1313
1314            if !entity_combos.is_empty() {
1315                // Batch phase: single provider round-trip for all rows.
1316                let batch_texts: Vec<String> =
1317                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1318
1319                let batch_client =
1320                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1321
1322                let embeddings = match tokio::runtime::Handle::try_current() {
1323                    Ok(handle) => tokio::task::block_in_place(|| {
1324                        handle.block_on(batch_client.embed_batch(
1325                            &provider,
1326                            &model,
1327                            &api_key,
1328                            batch_texts,
1329                        ))
1330                    }),
1331                    Err(_) => {
1332                        return Err(RedDBError::Query(
1333                            "AUTO EMBED requires a Tokio runtime context".to_string(),
1334                        ));
1335                    }
1336                }
1337                .map_err(|e| RedDBError::Query(e.to_string()))?;
1338
1339                // Distribute phase: persist one vector per non-empty embedding.
1340                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1341                    if dense.is_empty() {
1342                        continue;
1343                    }
1344                    self.create_vector(CreateVectorInput {
1345                        collection: query.table.clone(),
1346                        dense,
1347                        content: Some(combined.clone()),
1348                        metadata: Vec::new(),
1349                        link_row: None,
1350                        link_node: None,
1351                    })?;
1352                }
1353            }
1354        }
1355
1356        if inserted_count > 0 {
1357            self.note_table_write(&query.table);
1358        }
1359
1360        let mut result = RuntimeQueryResult::dml_result(
1361            raw_query.to_string(),
1362            inserted_count,
1363            "insert",
1364            "runtime-dml",
1365        );
1366        if let Some(returning) = returning_result {
1367            result.result = returning;
1368        }
1369        Ok(result)
1370    }
1371
1372    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1373        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1374            return Ok(());
1375        };
1376        if !auth_store.iam_authorization_enabled() {
1377            return Ok(());
1378        }
1379        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1380            return Ok(());
1381        };
1382
1383        let tenant = crate::runtime::impl_core::current_tenant();
1384        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1385        let request = crate::auth::ColumnAccessRequest {
1386            action: "insert".to_string(),
1387            schema: None,
1388            table: query.table.clone(),
1389            columns: query.columns.clone(),
1390        };
1391        let ctx = crate::auth::policies::EvalContext {
1392            principal_tenant: tenant.clone(),
1393            current_tenant: tenant,
1394            peer_ip: None,
1395            mfa_present: false,
1396            now_ms: crate::auth::now_ms(),
1397            principal_is_admin_role: role == crate::auth::Role::Admin,
1398        };
1399
1400        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1401        let table_allowed = matches!(
1402            outcome.table_decision,
1403            crate::auth::policies::Decision::Allow { .. }
1404                | crate::auth::policies::Decision::AdminBypass
1405        );
1406        if !table_allowed {
1407            return Err(RedDBError::Query(format!(
1408                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1409                outcome.table_resource.kind, outcome.table_resource.name
1410            )));
1411        }
1412        if let Some(denied) = outcome.first_denied_column() {
1413            return Err(RedDBError::Query(format!(
1414                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1415                denied.resource.kind, denied.resource.name
1416            )));
1417        }
1418
1419        Ok(())
1420    }
1421
1422    pub(crate) fn insert_timeseries_point(
1423        &self,
1424        collection: &str,
1425        fields: Vec<(String, Value)>,
1426        mut metadata: Vec<(String, MetadataValue)>,
1427    ) -> RedDBResult<EntityId> {
1428        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1429
1430        let (columns, values) = pairwise_columns_values(&fields);
1431        validate_timeseries_insert_columns(&columns)?;
1432
1433        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1434        // an `event_name` whose schema is registered, validate the
1435        // `payload` JSON against it BEFORE any write side-effect. On
1436        // failure we return a typed error and the row is not
1437        // persisted. When no schema is registered for the event name
1438        // (or no `event_name` column is supplied at all) we fall
1439        // through to the normal write path for back-compat with
1440        // existing timeseries rows.
1441        let event_name_opt =
1442            find_column_value_opt_string(&columns, &values, "event_name");
1443        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1444        if let Some(event_name) = event_name_opt.as_deref() {
1445            let store_for_schema = self.inner.db.store();
1446            if super::analytics_schema_registry::latest(
1447                store_for_schema.as_ref(),
1448                event_name,
1449            )
1450            .is_some()
1451            {
1452                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1453                super::analytics_schema_registry::validate(
1454                    store_for_schema.as_ref(),
1455                    event_name,
1456                    payload_json,
1457                )
1458                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1459            }
1460        }
1461
1462        // `metric` is required by the existing timeseries write path;
1463        // when an analytics-style row supplies `event_name` but not
1464        // `metric`, fall back to the event name so the storage path
1465        // still has a non-empty metric tag.
1466        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1467            Some(m) => m,
1468            None => event_name_opt.clone().ok_or_else(|| {
1469                RedDBError::Query(
1470                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1471                )
1472            })?,
1473        };
1474        // `value` is optional for analytics-event rows (which are
1475        // semantically counts of 1); default to 1.0 when missing so
1476        // analytics inserts don't have to fabricate a metric value.
1477        let value = match find_column_value_opt_string(&columns, &values, "value") {
1478            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1479            None => columns
1480                .iter()
1481                .position(|c| c.eq_ignore_ascii_case("value"))
1482                .and_then(|i| match &values[i] {
1483                    Value::Float(f) => Some(*f),
1484                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1485                    Value::UnsignedInteger(n) => Some(*n as f64),
1486                    _ => None,
1487                })
1488                .unwrap_or(1.0),
1489        };
1490        let timestamp_ns =
1491            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1492        let mut tags = find_timeseries_tags(&columns, &values)?;
1493        if let Some(ref name) = event_name_opt {
1494            tags.entry("event_name".to_string())
1495                .or_insert_with(|| name.clone());
1496        }
1497        if let Some(ref payload) = payload_opt {
1498            tags.entry("payload".to_string())
1499                .or_insert_with(|| payload.clone());
1500        }
1501
1502        let mut entity = UnifiedEntity::new(
1503            EntityId::new(0),
1504            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1505                series: collection.to_string(),
1506                metric: metric.clone(),
1507            })),
1508            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1509                metric,
1510                timestamp_ns,
1511                value,
1512                tags,
1513            }),
1514        );
1515        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1516        // or an autocommit xid (allocated and committed up-front so
1517        // future snapshots see the row as soon as it lands).
1518        let writer_xid = match self.current_xid() {
1519            Some(xid) => xid,
1520            None => {
1521                let mgr = self.snapshot_manager();
1522                let xid = mgr.begin();
1523                mgr.commit(xid);
1524                xid
1525            }
1526        };
1527        entity.set_xmin(writer_xid);
1528
1529        let store = self.inner.db.store();
1530        let id = store
1531            .insert_auto(collection, entity)
1532            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1533
1534        if !metadata.is_empty() {
1535            let _ = store.set_metadata(
1536                collection,
1537                id,
1538                Metadata::with_fields(metadata.into_iter().collect()),
1539            );
1540        }
1541
1542        self.cdc_emit(
1543            crate::replication::cdc::ChangeOperation::Insert,
1544            collection,
1545            id.raw(),
1546            "timeseries",
1547        );
1548
1549        Ok(id)
1550    }
1551
1552    /// Execute UPDATE table SET col=val, ... WHERE filter
1553    ///
1554    /// Scans the target collection, evaluates the WHERE filter against each
1555    /// record, and patches every matching entity.
1556    pub fn execute_update(
1557        &self,
1558        raw_query: &str,
1559        query: &UpdateQuery,
1560    ) -> RedDBResult<RuntimeQueryResult> {
1561        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1562        // Issue #523 — blockchain collections are immutable. Reject before
1563        // RLS / RETURNING work so the operator sees a clean 409-mapped
1564        // error instead of a partially-applied mutation surface.
1565        if crate::runtime::blockchain_kind::is_chain(
1566            self.inner.db.store().as_ref(),
1567            &query.table,
1568        ) {
1569            return Err(RedDBError::InvalidOperation(format!(
1570                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1571                query.table
1572            )));
1573        }
1574        // CollectionContract gate (#50): runs the APPEND ONLY guard
1575        // (and any future contract bits) before RLS / RETURNING work
1576        // so the operator's immutability declaration is honoured
1577        // uniformly and the error message points at the DDL rather
1578        // than at a downstream symptom.
1579        crate::runtime::collection_contract::CollectionContractGate::check(
1580            self,
1581            &query.table,
1582            crate::runtime::collection_contract::MutationKind::Update,
1583        )?;
1584        ensure_update_target_contract(self, &query.table, query.target)?;
1585        ensure_graph_identity_update_target_allowed(query)?;
1586
1587        // Apply RLS augmentation first so every downstream path — plain
1588        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1589        // same policy-filtered target set. This prevents RETURNING
1590        // from ever exposing rows the UPDATE policy would have
1591        // denied.
1592        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1593        let augmented_query: UpdateQuery;
1594        let effective_query: &UpdateQuery = if rls_gated {
1595            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1596                self,
1597                &query.table,
1598                crate::storage::query::ast::PolicyAction::Update,
1599            );
1600            let Some(policy) = rls_filter else {
1601                // No admitting policy: zero rows affected, empty
1602                // RETURNING (never leak rows the caller can't touch).
1603                let mut response = RuntimeQueryResult::dml_result(
1604                    raw_query.to_string(),
1605                    0,
1606                    "update",
1607                    "runtime-dml-rls",
1608                );
1609                if let Some(items) = query.returning.clone() {
1610                    response.result = build_returning_result(&items, &[], None);
1611                }
1612                return Ok(response);
1613            };
1614            let mut augmented = query.clone();
1615            augmented.filter = Some(match augmented.filter.take() {
1616                Some(existing) => {
1617                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1618                }
1619                None => policy,
1620            });
1621            augmented_query = augmented;
1622            &augmented_query
1623        } else {
1624            query
1625        };
1626
1627        // RETURNING wraps the inner executor and uses the touched-id
1628        // list the inner reports so the post-image reflects exactly
1629        // the rows the UPDATE actually mutated (not whatever a
1630        // separate SELECT might have observed).
1631        if let Some(items) = effective_query.returning.clone() {
1632            let mut inner_query = effective_query.clone();
1633            inner_query.returning = None;
1634            let (mut response, touched_ids) =
1635                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1636
1637            let snapshots = if matches!(
1638                effective_query.target,
1639                UpdateTarget::Nodes | UpdateTarget::Edges
1640            ) {
1641                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1642            } else {
1643                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1644                    .row_snapshots(&touched_ids)
1645            };
1646
1647            response.result = build_returning_result(&items, &snapshots, None);
1648            response.engine = "runtime-dml-returning";
1649            return Ok(response);
1650        }
1651
1652        self.execute_update_inner(raw_query, effective_query)
1653    }
1654
1655    /// Back-compat shim: the older entry point ignored touched ids.
1656    fn execute_update_inner(
1657        &self,
1658        raw_query: &str,
1659        query: &UpdateQuery,
1660    ) -> RedDBResult<RuntimeQueryResult> {
1661        self.execute_update_inner_tracked(raw_query, query)
1662            .map(|(res, _)| res)
1663    }
1664
1665    fn execute_update_inner_tracked(
1666        &self,
1667        raw_query: &str,
1668        query: &UpdateQuery,
1669    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1670        let store = self.inner.db.store();
1671        let effective_filter = effective_update_filter(query);
1672        let compiled_plan = self.compile_update_plan(query)?;
1673        let mut touched_ids: Vec<EntityId> = Vec::new();
1674        let limit_cap = query.limit.map(|l| l as usize);
1675        let manager = store
1676            .get_collection(&query.table)
1677            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1678        let scan_limit = if query.order_by.is_empty() {
1679            limit_cap
1680        } else {
1681            None
1682        };
1683        let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1684            self,
1685            &query.table,
1686            effective_filter.as_ref(),
1687            scan_limit,
1688            query.target,
1689        )
1690        .find_target_ids()?;
1691        let ids_to_update = if query.order_by.is_empty() {
1692            ids_to_update
1693        } else {
1694            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1695        };
1696
1697        if update_needs_rmw_lock(query) {
1698            return self.execute_update_inner_tracked_locked(
1699                raw_query,
1700                query,
1701                &compiled_plan,
1702                &ids_to_update,
1703                effective_filter.as_ref(),
1704            );
1705        }
1706
1707        let mut affected: u64 = 0;
1708        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1709            let mut applied_chunk = Vec::with_capacity(chunk.len());
1710            for entity in manager.get_many(chunk).into_iter().flatten() {
1711                let assignments =
1712                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1713                let applied = self.apply_materialized_update_for_entity(
1714                    query.table.clone(),
1715                    entity,
1716                    &compiled_plan,
1717                    assignments,
1718                )?;
1719                touched_ids.push(applied.id);
1720                applied_chunk.push(applied);
1721            }
1722            self.persist_update_chunk(&applied_chunk)?;
1723            affected += applied_chunk.len() as u64;
1724            let lsns = self.flush_update_chunk(&applied_chunk)?;
1725            if !query.suppress_events {
1726                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1727            }
1728        }
1729
1730        if affected > 0 {
1731            self.note_table_write(&query.table);
1732        }
1733
1734        Ok((
1735            RuntimeQueryResult::dml_result(
1736                raw_query.to_string(),
1737                affected,
1738                "update",
1739                "runtime-dml",
1740            ),
1741            touched_ids,
1742        ))
1743    }
1744
1745    fn execute_update_inner_tracked_locked(
1746        &self,
1747        raw_query: &str,
1748        query: &UpdateQuery,
1749        compiled_plan: &CompiledUpdatePlan,
1750        ids_to_update: &[EntityId],
1751        effective_filter: Option<&Filter>,
1752    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1753        let store = self.inner.db.store();
1754        let mut touched_ids = Vec::new();
1755        let mut lock_entries = Vec::new();
1756
1757        for id in ids_to_update {
1758            let Some(candidate) = store.get(&query.table, *id) else {
1759                continue;
1760            };
1761            let logical_id = candidate.logical_id();
1762            let lock_key = format!("row:{}", logical_id.raw());
1763            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1764            lock_entries.push((lock_key, logical_id, rmw_lock));
1765        }
1766
1767        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1768        lock_entries.dedup_by(|left, right| left.0 == right.0);
1769        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1770
1771        let mut applied_chunk = Vec::new();
1772        for (_, logical_id, _) in &lock_entries {
1773            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1774            else {
1775                continue;
1776            };
1777            if let Some(filter) = effective_filter {
1778                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1779                    Some(self.inner.db.as_ref()),
1780                    &entity,
1781                    filter,
1782                    &query.table,
1783                    &query.table,
1784                ) {
1785                    continue;
1786                }
1787            }
1788
1789            let assignments =
1790                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1791            let applied = self.apply_materialized_update_for_entity(
1792                query.table.clone(),
1793                entity,
1794                compiled_plan,
1795                assignments,
1796            )?;
1797            touched_ids.push(applied.id);
1798            applied_chunk.push(applied);
1799        }
1800
1801        let affected = applied_chunk.len() as u64;
1802        if !applied_chunk.is_empty() {
1803            self.persist_update_chunk(&applied_chunk)?;
1804            let lsns = self.flush_update_chunk(&applied_chunk)?;
1805            if !query.suppress_events {
1806                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1807            }
1808        }
1809
1810        if affected > 0 {
1811            self.note_table_write(&query.table);
1812        }
1813
1814        Ok((
1815            RuntimeQueryResult::dml_result(
1816                raw_query.to_string(),
1817                affected,
1818                "update",
1819                "runtime-dml",
1820            ),
1821            touched_ids,
1822        ))
1823    }
1824
1825    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1826        let mut static_field_assignments = Vec::new();
1827        let mut static_metadata_assignments = Vec::new();
1828        let mut dynamic_assignments = Vec::new();
1829        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1830        let mut row_modified_columns = Vec::new();
1831
1832        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1833            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1834            let metadata_key = resolve_sql_ttl_metadata_key(column);
1835            if compound_op.is_some() && metadata_key.is_some() {
1836                return Err(RedDBError::Query(format!(
1837                    "compound assignment is only supported for row fields: {column}"
1838                )));
1839            }
1840            if compound_op.is_none() {
1841                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1842                    if let Some(metadata_key) = metadata_key {
1843                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1844                        let (canonical_key, canonical_value) =
1845                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1846                        static_metadata_assignments
1847                            .push((canonical_key.to_string(), canonical_value));
1848                    } else {
1849                        let value = self.resolve_crypto_sentinel(value)?;
1850                        static_field_assignments.push((
1851                            column.clone(),
1852                            normalize_row_update_assignment_with_plan(
1853                                &query.table,
1854                                column,
1855                                value,
1856                                row_contract_plan.as_ref(),
1857                            )?,
1858                        ));
1859                        row_modified_columns.push(column.clone());
1860                    }
1861                    continue;
1862                }
1863            }
1864
1865            dynamic_assignments.push(CompiledUpdateAssignment {
1866                column: column.clone(),
1867                expr: expr.clone(),
1868                compound_op,
1869                metadata_key,
1870                row_rule: if metadata_key.is_none() {
1871                    if let Some(plan) = row_contract_plan.as_ref() {
1872                        if plan.timestamps_enabled
1873                            && (column == "created_at" || column == "updated_at")
1874                        {
1875                            return Err(RedDBError::Query(format!(
1876                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1877                                query.table, column
1878                            )));
1879                        }
1880                        if let Some(rule) = plan.declared_rules.get(column) {
1881                            Some(rule.clone())
1882                        } else if plan.strict_schema {
1883                            return Err(RedDBError::Query(format!(
1884                                "collection '{}' is strict and does not allow undeclared fields: {}",
1885                                query.table, column
1886                            )));
1887                        } else {
1888                            None
1889                        }
1890                    } else {
1891                        None
1892                    }
1893                } else {
1894                    None
1895                },
1896            });
1897            if metadata_key.is_none() {
1898                row_modified_columns.push(column.clone());
1899            }
1900        }
1901
1902        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1903        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1904            row_modified_columns.iter().any(|column| {
1905                plan.unique_columns
1906                    .keys()
1907                    .any(|unique| unique.eq_ignore_ascii_case(column))
1908            })
1909        });
1910
1911        if let Some(ttl_ms) = query.ttl_ms {
1912            static_metadata_assignments
1913                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1914        }
1915        if let Some(expires_at_ms) = query.expires_at_ms {
1916            static_metadata_assignments.push((
1917                "_expires_at".to_string(),
1918                metadata_u64_to_value(expires_at_ms),
1919            ));
1920        }
1921        for (key, val) in &query.with_metadata {
1922            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1923        }
1924
1925        Ok(CompiledUpdatePlan {
1926            static_field_assignments,
1927            static_metadata_assignments,
1928            dynamic_assignments,
1929            row_contract_plan,
1930            row_modified_columns,
1931            row_touches_unique_columns,
1932        })
1933    }
1934
1935    fn materialize_update_assignments_for_entity(
1936        &self,
1937        query: &UpdateQuery,
1938        entity: &UnifiedEntity,
1939        compiled_plan: &CompiledUpdatePlan,
1940    ) -> RedDBResult<MaterializedUpdateAssignments> {
1941        let mut assignments = MaterializedUpdateAssignments::default();
1942        let mut record: Option<UnifiedRecord> = None;
1943
1944        for assignment in &compiled_plan.dynamic_assignments {
1945            if assignment.compound_op.is_some()
1946                && !matches!(
1947                    entity.data,
1948                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1949                )
1950            {
1951                return Err(RedDBError::Query(format!(
1952                    "compound assignment is only supported for row or graph UPDATE column '{}'",
1953                    assignment.column
1954                )));
1955            }
1956            if record.is_none() {
1957                record = runtime_any_record_from_entity_ref(entity);
1958            }
1959            let Some(record) = record.as_ref() else {
1960                return Err(RedDBError::Query(format!(
1961                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1962                    entity.id.raw(),
1963                    query.table
1964                )));
1965            };
1966            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1967                Some(self.inner.db.as_ref()),
1968                &assignment.expr,
1969                record,
1970                Some(query.table.as_str()),
1971                Some(query.table.as_str()),
1972            )
1973            .ok_or_else(|| {
1974                RedDBError::Query(format!(
1975                    "failed to evaluate UPDATE expression for column '{}'",
1976                    assignment.column
1977                ))
1978            })?;
1979            let value = if let Some(op) = assignment.compound_op {
1980                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1981            } else {
1982                rhs
1983            };
1984
1985            if let Some(metadata_key) = assignment.metadata_key {
1986                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1987                let (canonical_key, canonical_value) =
1988                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1989                assignments
1990                    .dynamic_metadata_assignments
1991                    .push((canonical_key.to_string(), canonical_value));
1992            } else {
1993                assignments.dynamic_field_assignments.push((
1994                    assignment.column.clone(),
1995                    normalize_row_update_value_for_rule(
1996                        &query.table,
1997                        self.resolve_crypto_sentinel(value)?,
1998                        assignment.row_rule.as_ref(),
1999                    )?,
2000                ));
2001            }
2002        }
2003
2004        Ok(assignments)
2005    }
2006
2007    fn apply_materialized_update_for_entity(
2008        &self,
2009        collection: String,
2010        entity: UnifiedEntity,
2011        compiled_plan: &CompiledUpdatePlan,
2012        assignments: MaterializedUpdateAssignments,
2013    ) -> RedDBResult<AppliedEntityMutation> {
2014        if matches!(entity.data, EntityData::Row(_)) {
2015            return self.apply_loaded_sql_update_row_core(
2016                collection,
2017                entity,
2018                &compiled_plan.static_field_assignments,
2019                assignments.dynamic_field_assignments,
2020                &compiled_plan.static_metadata_assignments,
2021                assignments.dynamic_metadata_assignments,
2022                compiled_plan.row_contract_plan.as_ref(),
2023                &compiled_plan.row_modified_columns,
2024                compiled_plan.row_touches_unique_columns,
2025            );
2026        }
2027
2028        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2029
2030        let operations = build_patch_operations_from_materialized_assignments(
2031            &entity,
2032            compiled_plan,
2033            assignments,
2034        );
2035        self.apply_loaded_patch_entity_core(
2036            collection,
2037            entity,
2038            crate::json::Value::Null,
2039            operations,
2040        )
2041    }
2042
2043    /// Execute DELETE FROM table WHERE filter
2044    pub fn execute_delete(
2045        &self,
2046        raw_query: &str,
2047        query: &DeleteQuery,
2048    ) -> RedDBResult<RuntimeQueryResult> {
2049        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2050        // Issue #523 — blockchain collections are immutable; see
2051        // execute_update for the same gate.
2052        if crate::runtime::blockchain_kind::is_chain(
2053            self.inner.db.store().as_ref(),
2054            &query.table,
2055        ) {
2056            return Err(RedDBError::InvalidOperation(format!(
2057                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2058                query.table
2059            )));
2060        }
2061        // CollectionContract gate (#50) — see execute_update for
2062        // rationale. The gate handles APPEND ONLY rejection and is
2063        // the single point where future contract bits land.
2064        crate::runtime::collection_contract::CollectionContractGate::check(
2065            self,
2066            &query.table,
2067            crate::runtime::collection_contract::MutationKind::Delete,
2068        )?;
2069
2070        // RETURNING on DELETE: capture the pre-image via an internal
2071        // SELECT that reuses the same WHERE, then run the delete with
2072        // the RETURNING clause stripped, then project the captured
2073        // rows through the requested items. The extra SELECT is a
2074        // pragmatic MVP — a future pass can fuse the scan with the
2075        // delete to avoid the second pass over the heap.
2076        if let Some(items) = query.returning.clone() {
2077            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2078                RedDBError::Query(
2079                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2080                )
2081            })?;
2082            let captured = self.execute_query(&select_sql)?;
2083
2084            let mut inner_query = query.clone();
2085            inner_query.returning = None;
2086            let _ = self.execute_delete(raw_query, &inner_query)?;
2087
2088            let snapshots: Vec<Vec<(String, Value)>> = captured
2089                .result
2090                .records
2091                .iter()
2092                .map(|rec| {
2093                    rec.iter_fields()
2094                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2095                        .collect()
2096                })
2097                .collect();
2098            let affected = snapshots.len() as u64;
2099            let result = build_returning_result(&items, &snapshots, None);
2100
2101            let mut response = RuntimeQueryResult::dml_result(
2102                raw_query.to_string(),
2103                affected,
2104                "delete",
2105                "runtime-dml-returning",
2106            );
2107            response.result = result;
2108            return Ok(response);
2109        }
2110        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2111        //
2112        // When the table has RLS enabled, gate the DELETE by the
2113        // per-role policy set: mutations only touch rows that *every*
2114        // matching `FOR DELETE` policy would accept. No policies =>
2115        // zero rows affected (PG restrictive-default).
2116        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2117            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2118                self,
2119                &query.table,
2120                crate::storage::query::ast::PolicyAction::Delete,
2121            );
2122            let Some(policy) = rls_filter else {
2123                return Ok(RuntimeQueryResult::dml_result(
2124                    raw_query.to_string(),
2125                    0,
2126                    "delete",
2127                    "runtime-dml-rls",
2128                ));
2129            };
2130            // Fold the policy predicate into the user's WHERE before
2131            // dispatching — the remainder of this function reads the
2132            // filter from `query` via `effective_delete_filter`, which
2133            // respects the updated value.
2134            let mut augmented = query.clone();
2135            augmented.filter = Some(match augmented.filter.take() {
2136                Some(existing) => {
2137                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2138                }
2139                None => policy,
2140            });
2141            return self.execute_delete_inner(raw_query, &augmented);
2142        }
2143        self.execute_delete_inner(raw_query, query)
2144    }
2145
2146    fn execute_delete_inner(
2147        &self,
2148        raw_query: &str,
2149        query: &DeleteQuery,
2150    ) -> RedDBResult<RuntimeQueryResult> {
2151        let effective_filter = effective_delete_filter(query);
2152
2153        // Find the rows that match the WHERE clause. The "find target
2154        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2155        // the same scan strategy.
2156        let scan = super::dml_target_scan::DmlTargetScan::new(
2157            self,
2158            &query.table,
2159            effective_filter.as_ref(),
2160            None,
2161        );
2162        let ids_to_delete = scan.find_target_ids()?;
2163
2164        // For event-enabled collections, snapshot the pre-delete state
2165        // before rows are physically removed.
2166        let needs_delete_events =
2167            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2168        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2169            scan.row_json_pre_images(&ids_to_delete)
2170        } else {
2171            HashMap::new()
2172        };
2173
2174        let mut affected: u64 = 0;
2175        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2176            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2177            affected += count;
2178            if needs_delete_events && !lsns.is_empty() {
2179                // lsns.len() == actually-deleted entities; align with chunk ids.
2180                // `delete_batch` may skip missing entities, so we correlate by
2181                // the number returned (they're emitted in chunk order).
2182                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2183                self.emit_delete_events_for_collection(
2184                    &query.table,
2185                    deleted_chunk,
2186                    &lsns,
2187                    &pre_images,
2188                )?;
2189            }
2190        }
2191        pre_images.clear();
2192
2193        if affected > 0 {
2194            self.note_table_write(&query.table);
2195        }
2196
2197        Ok(RuntimeQueryResult::dml_result(
2198            raw_query.to_string(),
2199            affected,
2200            "delete",
2201            "runtime-dml",
2202        ))
2203    }
2204}
2205
2206/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2207/// columns regardless of whether any row matches the WHERE clause. The
2208/// per-entity guard below covers only the matched-rows case, but ADR 0019
2209/// declares these columns immutable on the surface itself, so a zero-row
2210/// UPDATE should still surface the same error to operators and SDKs.
2211fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2212    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2213        return Ok(());
2214    }
2215    for (column, _) in &query.assignment_exprs {
2216        if is_immutable_graph_identity_field(column) {
2217            return Err(RedDBError::Query(format!(
2218                "immutable graph field '{column}' cannot be updated"
2219            )));
2220        }
2221    }
2222    Ok(())
2223}
2224
2225fn ensure_graph_identity_update_allowed(
2226    entity: &UnifiedEntity,
2227    compiled_plan: &CompiledUpdatePlan,
2228    assignments: &MaterializedUpdateAssignments,
2229) -> RedDBResult<()> {
2230    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2231        return Ok(());
2232    }
2233
2234    for (column, _) in compiled_plan
2235        .static_field_assignments
2236        .iter()
2237        .chain(assignments.dynamic_field_assignments.iter())
2238    {
2239        if is_immutable_graph_identity_field(column) {
2240            return Err(RedDBError::Query(format!(
2241                "immutable graph field '{column}' cannot be updated"
2242            )));
2243        }
2244    }
2245
2246    Ok(())
2247}
2248
2249fn is_immutable_graph_identity_field(column: &str) -> bool {
2250    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2251        .iter()
2252        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2253}
2254
2255fn build_patch_operations_from_materialized_assignments(
2256    entity: &UnifiedEntity,
2257    compiled_plan: &CompiledUpdatePlan,
2258    assignments: MaterializedUpdateAssignments,
2259) -> Vec<PatchEntityOperation> {
2260    let mut operations = Vec::with_capacity(
2261        compiled_plan.static_field_assignments.len()
2262            + compiled_plan.static_metadata_assignments.len()
2263            + assignments.dynamic_field_assignments.len()
2264            + assignments.dynamic_metadata_assignments.len(),
2265    );
2266
2267    for (column, value) in &compiled_plan.static_field_assignments {
2268        operations.push(PatchEntityOperation {
2269            op: PatchEntityOperationType::Set,
2270            path: update_patch_path_for_entity(entity, column),
2271            value: Some(storage_value_to_json(value)),
2272        });
2273    }
2274
2275    for (column, value) in assignments.dynamic_field_assignments {
2276        operations.push(PatchEntityOperation {
2277            op: PatchEntityOperationType::Set,
2278            path: update_patch_path_for_entity(entity, &column),
2279            value: Some(storage_value_to_json(&value)),
2280        });
2281    }
2282
2283    for (key, value) in &compiled_plan.static_metadata_assignments {
2284        operations.push(PatchEntityOperation {
2285            op: PatchEntityOperationType::Set,
2286            path: vec!["metadata".to_string(), key.clone()],
2287            value: Some(metadata_value_to_json(value)),
2288        });
2289    }
2290
2291    for (key, value) in assignments.dynamic_metadata_assignments {
2292        operations.push(PatchEntityOperation {
2293            op: PatchEntityOperationType::Set,
2294            path: vec!["metadata".to_string(), key],
2295            value: Some(metadata_value_to_json(&value)),
2296        });
2297    }
2298
2299    operations
2300}
2301
2302fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2303    if matches!(
2304        (&entity.kind, &entity.data),
2305        (
2306            crate::storage::EntityKind::GraphNode(_),
2307            EntityData::Node(_)
2308        )
2309    ) && column.eq_ignore_ascii_case("node_type")
2310    {
2311        return vec!["node_type".to_string()];
2312    }
2313    if matches!(
2314        (&entity.kind, &entity.data),
2315        (
2316            crate::storage::EntityKind::GraphEdge(_),
2317            EntityData::Edge(_)
2318        )
2319    ) && column.eq_ignore_ascii_case("weight")
2320    {
2321        return vec!["weight".to_string()];
2322    }
2323    vec!["fields".to_string(), column.to_string()]
2324}
2325
2326/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2327/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2328/// capture the pre-image before actually removing the rows. Returns
2329/// `None` when the input does not start with `DELETE`.
2330///
2331/// Case-insensitive on the keywords. Preserves everything between
2332/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2333/// LIMIT survive untouched. The RETURNING tail — if present — is
2334/// truncated at the first top-level `RETURNING` token.
2335fn delete_to_select_sql(sql: &str) -> Option<String> {
2336    let trimmed = sql.trim_start();
2337    let lowered = trimmed.to_ascii_lowercase();
2338    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2339        return None;
2340    }
2341    // Find `FROM` after DELETE.
2342    let from_idx = lowered.find(" from ")?;
2343    let after_from = &trimmed[from_idx + " from ".len()..];
2344    let after_from_lc = &lowered[from_idx + " from ".len()..];
2345
2346    // Cut off the RETURNING tail (a naive search — the RETURNING
2347    // clause only appears once per statement at top level in our
2348    // grammar). Matches whitespace-bounded tokens to avoid clipping
2349    // `RETURNING` inside a string literal.
2350    let mut body = after_from.to_string();
2351    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2352        body.truncate(pos);
2353    }
2354    Some(format!("SELECT * FROM {}", body.trim_end()))
2355}
2356
2357/// Find the byte offset of a whitespace-bounded keyword in a
2358/// lowercased haystack, skipping matches inside single-quoted
2359/// string literals. Naive — no escape handling — but enough for
2360/// the shapes the DML parser emits.
2361fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2362    let bytes = haystack.as_bytes();
2363    let nlen = needle.len();
2364    let mut i = 0usize;
2365    let mut in_string = false;
2366    while i < bytes.len() {
2367        let c = bytes[i];
2368        if c == b'\'' {
2369            in_string = !in_string;
2370            i += 1;
2371            continue;
2372        }
2373        if !in_string
2374            && i + nlen <= bytes.len()
2375            && &bytes[i..i + nlen] == needle.as_bytes()
2376            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2377            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2378        {
2379            return Some(i);
2380        }
2381        i += 1;
2382    }
2383    None
2384}
2385
2386/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2387/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2388/// for one affected row; `outputs`, when provided, supplies the engine-
2389/// assigned entity id for the same row (INSERT path). Projection honours
2390/// the RETURNING items: `*` expands to every snapshot column plus
2391/// the public row envelope when available.
2392fn build_returning_result(
2393    items: &[ReturningItem],
2394    snapshots: &[Vec<(String, Value)>],
2395    outputs: Option<&[CreateEntityOutput]>,
2396) -> UnifiedResult {
2397    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2398    let public_item_outputs = outputs.is_some_and(|outs| {
2399        outs.first()
2400            .and_then(|out| out.entity.as_ref())
2401            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2402    });
2403
2404    let mut columns: Vec<String> = if project_all {
2405        let mut cols: Vec<String> = Vec::new();
2406        if public_item_outputs {
2407            cols.extend(
2408                [
2409                    "rid",
2410                    "collection",
2411                    "kind",
2412                    "tenant",
2413                    "created_at",
2414                    "updated_at",
2415                ]
2416                .into_iter()
2417                .map(str::to_string),
2418            );
2419        } else if outputs.is_some() {
2420            cols.push("red_entity_id".to_string());
2421        }
2422        if let Some(first) = snapshots.first() {
2423            for (name, _) in first {
2424                cols.push(name.clone());
2425            }
2426        }
2427        cols
2428    } else {
2429        items
2430            .iter()
2431            .filter_map(|it| match it {
2432                ReturningItem::Column(c) => Some(c.clone()),
2433                ReturningItem::All => None,
2434            })
2435            .collect()
2436    };
2437    // Guarantee unique order-preserving column list.
2438    {
2439        let mut seen = std::collections::HashSet::new();
2440        columns.retain(|c| seen.insert(c.clone()));
2441    }
2442
2443    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2444    for (idx, snap) in snapshots.iter().enumerate() {
2445        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2446        if let Some(outs) = outputs {
2447            if let Some(out) = outs.get(idx) {
2448                if let Some(entity) = out.entity.as_ref() {
2449                    if let Some(kind) = public_returning_item_kind(entity) {
2450                        values.insert(
2451                            Arc::clone(&sys_key_rid()),
2452                            Value::UnsignedInteger(out.id.raw()),
2453                        );
2454                        values.insert(
2455                            Arc::clone(&sys_key_collection()),
2456                            Value::text(entity.kind.collection().to_string()),
2457                        );
2458                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2459                        values.insert(
2460                            Arc::clone(&sys_key_created_at()),
2461                            Value::UnsignedInteger(entity.created_at),
2462                        );
2463                        values.insert(
2464                            Arc::clone(&sys_key_updated_at()),
2465                            Value::UnsignedInteger(entity.updated_at),
2466                        );
2467                    } else {
2468                        values.insert(
2469                            Arc::clone(&sys_key_red_entity_id()),
2470                            Value::Integer(out.id.raw() as i64),
2471                        );
2472                    }
2473                } else {
2474                    values.insert(
2475                        Arc::clone(&sys_key_red_entity_id()),
2476                        Value::Integer(out.id.raw() as i64),
2477                    );
2478                }
2479            }
2480        }
2481        for (name, val) in snap {
2482            values.insert(Arc::from(name.as_str()), val.clone());
2483        }
2484        if !values.contains_key("tenant") {
2485            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2486            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2487        }
2488        let mut rec = UnifiedRecord::default();
2489        // Only keep projected columns on the record.
2490        for col in &columns {
2491            if let Some(v) = values.get(col.as_str()) {
2492                rec.set_arc(Arc::from(col.as_str()), v.clone());
2493            }
2494        }
2495        records.push(rec);
2496    }
2497
2498    UnifiedResult {
2499        columns,
2500        records,
2501        stats: Default::default(),
2502        pre_serialized_json: None,
2503    }
2504}
2505
2506fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2507    match (&entity.kind, &entity.data) {
2508        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2509            Some("node")
2510        }
2511        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2512            Some("edge")
2513        }
2514        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2515        _ => None,
2516    }
2517}
2518
2519fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2520    let Some(row) = entity.data.as_row() else {
2521        return "row";
2522    };
2523
2524    let is_kv = row.named.as_ref().is_some_and(|named| {
2525        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2526            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2527    });
2528    if is_kv {
2529        return "kv";
2530    }
2531
2532    let is_document = row
2533        .named
2534        .as_ref()
2535        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2536        || row.columns.iter().any(runtime_returning_documentish_value);
2537    if is_document {
2538        "document"
2539    } else {
2540        "row"
2541    }
2542}
2543
2544fn runtime_returning_documentish_value(value: &Value) -> bool {
2545    matches!(value, Value::Json(_) | Value::Blob(_))
2546}
2547
2548fn row_insert_returning_snapshots(
2549    outputs: &[CreateEntityOutput],
2550    fallback: Vec<Vec<(String, Value)>>,
2551) -> Vec<Vec<(String, Value)>> {
2552    outputs
2553        .iter()
2554        .enumerate()
2555        .map(|(idx, out)| {
2556            out.entity
2557                .as_ref()
2558                .map(entity_row_fields_snapshot)
2559                .filter(|snap| !snap.is_empty())
2560                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2561        })
2562        .collect()
2563}
2564
2565fn graph_insert_returning_snapshots(
2566    store: &crate::storage::unified::UnifiedStore,
2567    collection: &str,
2568    ids: &[EntityId],
2569) -> Vec<Vec<(String, Value)>> {
2570    let Some(manager) = store.get_collection(collection) else {
2571        return Vec::new();
2572    };
2573
2574    ids.iter()
2575        .filter_map(|id| manager.get(*id))
2576        .filter_map(|entity| {
2577            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2578            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2579            Some(record)
2580        })
2581        .map(|record| {
2582            record
2583                .iter_fields()
2584                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2585                .collect()
2586        })
2587        .collect()
2588}
2589
2590fn graph_update_returning_snapshots(
2591    runtime: &RedDBRuntime,
2592    collection: &str,
2593    ids: &[EntityId],
2594) -> Vec<Vec<(String, Value)>> {
2595    let store = runtime.db().store();
2596    let Some(manager) = store.get_collection(collection) else {
2597        return Vec::new();
2598    };
2599
2600    manager
2601        .get_many(ids)
2602        .into_iter()
2603        .flatten()
2604        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2605        .map(|record| {
2606            record
2607                .iter_fields()
2608                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2609                .collect()
2610        })
2611        .collect()
2612}
2613
2614fn ensure_update_target_contract(
2615    runtime: &RedDBRuntime,
2616    collection: &str,
2617    target: UpdateTarget,
2618) -> RedDBResult<()> {
2619    let Some(contract) = runtime.db().collection_contract(collection) else {
2620        return Ok(());
2621    };
2622    if update_target_contract_is_advisory(&contract)
2623        || update_target_allows_model(contract.declared_model, update_target_model(target))
2624    {
2625        return Ok(());
2626    }
2627    Err(RedDBError::InvalidOperation(format!(
2628        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2629        collection,
2630        update_model_name(contract.declared_model),
2631        update_model_name(update_target_model(target))
2632    )))
2633}
2634
2635fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2636    matches!(
2637        (&contract.origin, &contract.schema_mode),
2638        (
2639            crate::physical::ContractOrigin::Implicit,
2640            crate::catalog::SchemaMode::Dynamic,
2641        )
2642    )
2643}
2644
2645fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2646    match target {
2647        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2648        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2649        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2650        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2651    }
2652}
2653
2654fn update_target_allows_model(
2655    declared_model: crate::catalog::CollectionModel,
2656    requested_model: crate::catalog::CollectionModel,
2657) -> bool {
2658    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2659}
2660
2661fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2662    match model {
2663        crate::catalog::CollectionModel::Table => "table",
2664        crate::catalog::CollectionModel::Document => "document",
2665        crate::catalog::CollectionModel::Graph => "graph",
2666        crate::catalog::CollectionModel::Vector => "vector",
2667        crate::catalog::CollectionModel::Hll => "hll",
2668        crate::catalog::CollectionModel::Sketch => "sketch",
2669        crate::catalog::CollectionModel::Filter => "filter",
2670        crate::catalog::CollectionModel::Kv => "kv",
2671        crate::catalog::CollectionModel::Config => "config",
2672        crate::catalog::CollectionModel::Vault => "vault",
2673        crate::catalog::CollectionModel::Mixed => "mixed",
2674        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2675        crate::catalog::CollectionModel::Queue => "queue",
2676        crate::catalog::CollectionModel::Metrics => "metrics",
2677    }
2678}
2679
2680fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2681    let db = runtime.db();
2682    if let Some(contract) = db.collection_contract(collection) {
2683        let advisory_implicit_dynamic = matches!(
2684            (&contract.origin, &contract.schema_mode),
2685            (
2686                crate::physical::ContractOrigin::Implicit,
2687                crate::catalog::SchemaMode::Dynamic,
2688            )
2689        );
2690        if advisory_implicit_dynamic
2691            || matches!(
2692                contract.declared_model,
2693                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2694            )
2695        {
2696            return Ok(());
2697        }
2698        return Err(RedDBError::InvalidOperation(format!(
2699            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2700            collection, contract.declared_model
2701        )));
2702    }
2703
2704    let now = std::time::SystemTime::now()
2705        .duration_since(std::time::UNIX_EPOCH)
2706        .unwrap_or_default()
2707        .as_millis();
2708    db.save_collection_contract(crate::physical::CollectionContract {
2709        name: collection.to_string(),
2710        declared_model: crate::catalog::CollectionModel::Graph,
2711        schema_mode: crate::catalog::SchemaMode::Dynamic,
2712        origin: crate::physical::ContractOrigin::Implicit,
2713        version: 1,
2714        created_at_unix_ms: now,
2715        updated_at_unix_ms: now,
2716        default_ttl_ms: db.collection_default_ttl_ms(collection),
2717        vector_dimension: None,
2718        vector_metric: None,
2719        context_index_fields: Vec::new(),
2720        declared_columns: Vec::new(),
2721        table_def: None,
2722        timestamps_enabled: false,
2723        context_index_enabled: false,
2724        metrics_raw_retention_ms: None,
2725        metrics_rollup_policies: Vec::new(),
2726        metrics_tenant_identity: None,
2727        metrics_namespace: None,
2728        append_only: false,
2729        subscriptions: Vec::new(),
2730        session_key: None,
2731        session_gap_ms: None,
2732        retention_duration_ms: None,
2733    })
2734    .map(|_| ())
2735    .map_err(|err| RedDBError::Internal(err.to_string()))
2736}
2737
2738fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2739    query
2740        .assignment_exprs
2741        .iter()
2742        .enumerate()
2743        .any(|(idx, (column, expr))| {
2744            query
2745                .compound_assignment_ops
2746                .get(idx)
2747                .is_some_and(|op| op.is_some())
2748                || expr_references_update_column(expr, &query.table, column)
2749        })
2750}
2751
2752fn evaluate_compound_update_assignment(
2753    column: &str,
2754    record: &UnifiedRecord,
2755    op: BinOp,
2756    rhs: Value,
2757) -> RedDBResult<Value> {
2758    let lhs = record.get(column).ok_or_else(|| {
2759        RedDBError::Query(format!(
2760            "compound assignment requires existing numeric field '{column}'"
2761        ))
2762    })?;
2763    if matches!(lhs, Value::Null) {
2764        return Err(RedDBError::Query(format!(
2765            "compound assignment requires non-null numeric field '{column}'"
2766        )));
2767    }
2768    apply_compound_numeric_op(column, op, lhs, &rhs)
2769}
2770
2771fn apply_compound_numeric_op(
2772    column: &str,
2773    op: BinOp,
2774    lhs: &Value,
2775    rhs: &Value,
2776) -> RedDBResult<Value> {
2777    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2778        return Err(RedDBError::Query(format!(
2779            "compound assignment requires numeric field '{column}'"
2780        )));
2781    };
2782    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2783        return Err(RedDBError::Query(format!(
2784            "compound assignment requires numeric right-hand value for field '{column}'"
2785        )));
2786    };
2787
2788    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2789        let a = lhs_number.as_f64();
2790        let b = rhs_number.as_f64();
2791        let out = match op {
2792            BinOp::Add => a + b,
2793            BinOp::Sub => a - b,
2794            BinOp::Mul => a * b,
2795            BinOp::Div => {
2796                if b == 0.0 {
2797                    return Err(RedDBError::Query(format!(
2798                        "division by zero in compound assignment for field '{column}'"
2799                    )));
2800                }
2801                a / b
2802            }
2803            BinOp::Mod => {
2804                if b == 0.0 {
2805                    return Err(RedDBError::Query(format!(
2806                        "modulo by zero in compound assignment for field '{column}'"
2807                    )));
2808                }
2809                a % b
2810            }
2811            _ => {
2812                return Err(RedDBError::Query(format!(
2813                    "unsupported compound assignment operator for field '{column}'"
2814                )));
2815            }
2816        };
2817        if !out.is_finite() {
2818            return Err(RedDBError::Query(format!(
2819                "numeric overflow in compound assignment for field '{column}'"
2820            )));
2821        }
2822        return Ok(Value::Float(out));
2823    }
2824
2825    let a = lhs_number.as_i128();
2826    let b = rhs_number.as_i128();
2827    let out = match op {
2828        BinOp::Add => a.checked_add(b),
2829        BinOp::Sub => a.checked_sub(b),
2830        BinOp::Mul => a.checked_mul(b),
2831        BinOp::Mod => {
2832            if b == 0 {
2833                return Err(RedDBError::Query(format!(
2834                    "modulo by zero in compound assignment for field '{column}'"
2835                )));
2836            }
2837            a.checked_rem(b)
2838        }
2839        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2840        _ => None,
2841    }
2842    .ok_or_else(|| {
2843        RedDBError::Query(format!(
2844            "numeric overflow in compound assignment for field '{column}'"
2845        ))
2846    })?;
2847
2848    if matches!(lhs, Value::UnsignedInteger(_)) {
2849        let value = u64::try_from(out).map_err(|_| {
2850            RedDBError::Query(format!(
2851                "numeric overflow in compound assignment for field '{column}'"
2852            ))
2853        })?;
2854        Ok(Value::UnsignedInteger(value))
2855    } else {
2856        let value = i64::try_from(out).map_err(|_| {
2857            RedDBError::Query(format!(
2858                "numeric overflow in compound assignment for field '{column}'"
2859            ))
2860        })?;
2861        Ok(Value::Integer(value))
2862    }
2863}
2864
2865#[derive(Clone, Copy)]
2866enum CompoundNumber {
2867    Integer(i128),
2868    Float(f64),
2869}
2870
2871impl CompoundNumber {
2872    fn from_value(value: &Value) -> Option<Self> {
2873        match value {
2874            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2875            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2876            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2877            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2878            _ => None,
2879        }
2880    }
2881
2882    fn is_float(self) -> bool {
2883        matches!(self, Self::Float(_))
2884    }
2885
2886    fn as_f64(self) -> f64 {
2887        match self {
2888            Self::Integer(value) => value as f64,
2889            Self::Float(value) => value,
2890        }
2891    }
2892
2893    fn as_i128(self) -> i128 {
2894        match self {
2895            Self::Integer(value) => value,
2896            Self::Float(_) => unreachable!("float compound number used as integer"),
2897        }
2898    }
2899}
2900
2901fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2902    match expr {
2903        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2904        Expr::Column { field, .. } => {
2905            field_ref_matches_update_column(field, table_name, target_column)
2906        }
2907        Expr::BinaryOp { lhs, rhs, .. } => {
2908            expr_references_update_column(lhs, table_name, target_column)
2909                || expr_references_update_column(rhs, table_name, target_column)
2910        }
2911        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2912            expr_references_update_column(operand, table_name, target_column)
2913        }
2914        Expr::FunctionCall { args, .. } => args
2915            .iter()
2916            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2917        Expr::Case {
2918            branches, else_, ..
2919        } => {
2920            branches.iter().any(|(cond, value)| {
2921                expr_references_update_column(cond, table_name, target_column)
2922                    || expr_references_update_column(value, table_name, target_column)
2923            }) || else_
2924                .as_deref()
2925                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2926        }
2927        Expr::IsNull { operand, .. } => {
2928            expr_references_update_column(operand, table_name, target_column)
2929        }
2930        Expr::InList { target, values, .. } => {
2931            expr_references_update_column(target, table_name, target_column)
2932                || values
2933                    .iter()
2934                    .any(|value| expr_references_update_column(value, table_name, target_column))
2935        }
2936        Expr::Between {
2937            target, low, high, ..
2938        } => {
2939            expr_references_update_column(target, table_name, target_column)
2940                || expr_references_update_column(low, table_name, target_column)
2941                || expr_references_update_column(high, table_name, target_column)
2942        }
2943    }
2944}
2945
2946fn field_ref_matches_update_column(
2947    field: &FieldRef,
2948    table_name: &str,
2949    target_column: &str,
2950) -> bool {
2951    match field {
2952        FieldRef::TableColumn { table, column } => {
2953            column.eq_ignore_ascii_case(target_column)
2954                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2955        }
2956        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2957            false
2958        }
2959    }
2960}
2961
2962fn resolve_update_entity_by_logical_id(
2963    runtime: &RedDBRuntime,
2964    table: &str,
2965    logical_id: EntityId,
2966) -> Option<UnifiedEntity> {
2967    let store = runtime.inner.db.store();
2968    if let Some(entity) = crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2969        .resolve_logical_id(&store, table, logical_id)
2970    {
2971        return Some(entity);
2972    }
2973    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
2974    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
2975    store.get(table, logical_id)
2976}
2977
2978fn update_cdc_item_kind(
2979    runtime: &RedDBRuntime,
2980    collection: &str,
2981    entity: &UnifiedEntity,
2982) -> &'static str {
2983    match &entity.data {
2984        EntityData::Node(_) => return "node",
2985        EntityData::Edge(_) => return "edge",
2986        _ => {}
2987    }
2988
2989    match runtime
2990        .db()
2991        .collection_contract(collection)
2992        .map(|contract| contract.declared_model)
2993    {
2994        Some(crate::catalog::CollectionModel::Document) => "document",
2995        Some(crate::catalog::CollectionModel::Kv)
2996        | Some(crate::catalog::CollectionModel::Vault) => "kv",
2997        _ => "row",
2998    }
2999}
3000
3001fn ordered_update_target_ids(
3002    manager: &Arc<crate::storage::SegmentManager>,
3003    entity_ids: &[EntityId],
3004    order_by: &[OrderByClause],
3005    limit: Option<usize>,
3006) -> Vec<EntityId> {
3007    let mut entities: Vec<UnifiedEntity> =
3008        manager.get_many(entity_ids).into_iter().flatten().collect();
3009    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3010    if let Some(limit) = limit {
3011        entities.truncate(limit);
3012    }
3013    entities.into_iter().map(|entity| entity.id).collect()
3014}
3015
3016fn compare_update_order(
3017    left: &UnifiedEntity,
3018    right: &UnifiedEntity,
3019    order_by: &[OrderByClause],
3020) -> Ordering {
3021    for clause in order_by {
3022        let left_value = update_order_value(left, &clause.field);
3023        let right_value = update_order_value(right, &clause.field);
3024        let ordering = compare_update_order_values(
3025            left_value.as_ref(),
3026            right_value.as_ref(),
3027            clause.nulls_first,
3028        );
3029        if ordering != Ordering::Equal {
3030            return if clause.ascending {
3031                ordering
3032            } else {
3033                ordering.reverse()
3034            };
3035        }
3036    }
3037    left.logical_id().raw().cmp(&right.logical_id().raw())
3038}
3039
3040fn compare_update_order_values(
3041    left: Option<&Value>,
3042    right: Option<&Value>,
3043    nulls_first: bool,
3044) -> Ordering {
3045    match (left, right) {
3046        (None, None) => Ordering::Equal,
3047        (None, Some(_)) => {
3048            if nulls_first {
3049                Ordering::Less
3050            } else {
3051                Ordering::Greater
3052            }
3053        }
3054        (Some(_), None) => {
3055            if nulls_first {
3056                Ordering::Greater
3057            } else {
3058                Ordering::Less
3059            }
3060        }
3061        (Some(left), Some(right)) => {
3062            crate::storage::query::value_compare::total_compare_values(left, right)
3063        }
3064    }
3065}
3066
3067fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3068    let FieldRef::TableColumn { table, column } = field else {
3069        return None;
3070    };
3071    if !table.is_empty() {
3072        return None;
3073    }
3074    if column.eq_ignore_ascii_case("rid") {
3075        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3076    }
3077    match &entity.data {
3078        EntityData::Row(row) => row.get_field(column).cloned(),
3079        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3080            .and_then(|record| record.get(column).cloned()),
3081        _ => None,
3082    }
3083}
3084
3085fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3086    if columns.is_empty() {
3087        return columns;
3088    }
3089
3090    let mut unique = Vec::with_capacity(columns.len());
3091    for column in columns.drain(..) {
3092        if !unique
3093            .iter()
3094            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3095        {
3096            unique.push(column);
3097        }
3098    }
3099    unique
3100}
3101
3102// =============================================================================
3103// Helper functions for extracting typed values from column/value pairs
3104// =============================================================================
3105
3106const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3107
3108fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3109    if column.eq_ignore_ascii_case("_ttl") {
3110        Some(SQL_TTL_METADATA_COLUMNS[0])
3111    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3112        Some(SQL_TTL_METADATA_COLUMNS[1])
3113    } else if column.eq_ignore_ascii_case("_expires_at") {
3114        Some(SQL_TTL_METADATA_COLUMNS[2])
3115    } else {
3116        None
3117    }
3118}
3119
3120/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3121/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3122/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3123/// `_ttl_ms` and `_expires_at` are passed through.
3124fn canonicalize_sql_ttl_metadata(
3125    key: &'static str,
3126    value: MetadataValue,
3127) -> (&'static str, MetadataValue) {
3128    if key != "_ttl" {
3129        return (key, value);
3130    }
3131    let scaled = match value {
3132        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3133        MetadataValue::Timestamp(ms_or_s) => {
3134            // Timestamp is already chosen for very large values; treat as
3135            // already-ms to avoid silent overflow.
3136            MetadataValue::Timestamp(ms_or_s)
3137        }
3138        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3139        other => other,
3140    };
3141    ("_ttl_ms", scaled)
3142}
3143
3144/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3145/// `SECRET('...')` literals. The runtime strips this marker and
3146/// applies the actual crypto transform during INSERT execution.
3147pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3148
3149impl RedDBRuntime {
3150    /// Strip the plaintext sentinel from a `Value::Password` or
3151    /// `Value::Secret` produced by the parser and apply the real
3152    /// crypto transform. `Password` is always hashed with argon2id.
3153    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3154    /// when `red.config.secret.auto_encrypt = true` (default).
3155    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3156        match value {
3157            Value::Password(marked) => {
3158                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3159                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3160                } else {
3161                    Ok(Value::Password(marked))
3162                }
3163            }
3164            Value::Secret(bytes) => {
3165                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3166                    if !self.secret_auto_encrypt() {
3167                        return Err(RedDBError::Query(
3168                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3169                             is false. Insert pre-encrypted bytes directly instead."
3170                                .to_string(),
3171                        ));
3172                    }
3173                    let key = self.secret_aes_key().ok_or_else(|| {
3174                        RedDBError::Query(
3175                            "SECRET() column encryption requires a bootstrapped \
3176                             vault (red.secret.aes_key is missing). Start the server \
3177                             with --vault to enable."
3178                                .to_string(),
3179                        )
3180                    })?;
3181                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3182                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3183                } else {
3184                    Ok(Value::Secret(bytes))
3185                }
3186            }
3187            other => Ok(other),
3188        }
3189    }
3190}
3191
3192/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3193/// This is the on-disk representation of `Value::Secret`.
3194fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3195    let nonce_bytes = crate::auth::store::random_bytes(12);
3196    let mut nonce = [0u8; 12];
3197    nonce.copy_from_slice(&nonce_bytes[..12]);
3198    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3199    let mut out = Vec::with_capacity(12 + ct.len());
3200    out.extend_from_slice(&nonce);
3201    out.extend_from_slice(&ct);
3202    out
3203}
3204
3205/// Decode a `Value::Secret` payload back to plaintext. Returns
3206/// `None` when the payload is too short or AES-GCM authentication
3207/// fails (tampered or wrong key).
3208pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3209    if payload.len() < 12 {
3210        return None;
3211    }
3212    let mut nonce = [0u8; 12];
3213    nonce.copy_from_slice(&payload[..12]);
3214    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3215}
3216
3217fn split_insert_metadata(
3218    runtime: &RedDBRuntime,
3219    columns: &[String],
3220    values: &[Value],
3221) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3222    let mut fields = Vec::new();
3223    let mut metadata = Vec::new();
3224
3225    for (column, value) in columns.iter().zip(values.iter()) {
3226        // Still support legacy _ttl columns for backward compat
3227        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3228            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3229            let (canonical_key, canonical_value) =
3230                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3231            metadata.push((canonical_key.to_string(), canonical_value));
3232            continue;
3233        }
3234        fields.push((
3235            column.clone(),
3236            runtime.resolve_crypto_sentinel(value.clone())?,
3237        ));
3238    }
3239
3240    Ok((fields, metadata))
3241}
3242
3243/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3244fn merge_with_clauses(
3245    metadata: &mut Vec<(String, MetadataValue)>,
3246    ttl_ms: Option<u64>,
3247    expires_at_ms: Option<u64>,
3248    with_metadata: &[(String, Value)],
3249) {
3250    if let Some(ms) = ttl_ms {
3251        metadata.push((
3252            "_ttl_ms".to_string(),
3253            if ms <= i64::MAX as u64 {
3254                MetadataValue::Int(ms as i64)
3255            } else {
3256                MetadataValue::Timestamp(ms)
3257            },
3258        ));
3259    }
3260    if let Some(ms) = expires_at_ms {
3261        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3262    }
3263    for (key, value) in with_metadata {
3264        let meta_value = match value {
3265            Value::Text(s) => MetadataValue::String(s.to_string()),
3266            Value::Integer(n) => MetadataValue::Int(*n),
3267            Value::Float(n) => MetadataValue::Float(*n),
3268            Value::Boolean(b) => MetadataValue::Bool(*b),
3269            _ => MetadataValue::String(value.to_string()),
3270        };
3271        metadata.push((key.clone(), meta_value));
3272    }
3273}
3274
3275fn merge_vector_metadata_column(
3276    metadata: &mut Vec<(String, MetadataValue)>,
3277    columns: &[String],
3278    values: &[Value],
3279) -> RedDBResult<()> {
3280    let Some(value) = columns
3281        .iter()
3282        .position(|column| column.eq_ignore_ascii_case("metadata"))
3283        .map(|index| &values[index])
3284    else {
3285        return Ok(());
3286    };
3287    let json = match value {
3288        Value::Null => return Ok(()),
3289        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3290            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3291        })?,
3292        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3293            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3294        })?,
3295        other => {
3296            return Err(RedDBError::Query(format!(
3297                "column 'metadata' expected JSON object, got {other:?}"
3298            )))
3299        }
3300    };
3301    let parsed = metadata_from_json(&json)?;
3302    for (key, value) in parsed.iter() {
3303        metadata.push((key.clone(), value.clone()));
3304    }
3305    Ok(())
3306}
3307
3308fn apply_collection_default_ttl_metadata(
3309    runtime: &RedDBRuntime,
3310    collection: &str,
3311    metadata: &mut Vec<(String, MetadataValue)>,
3312) {
3313    if has_internal_ttl_metadata(metadata) {
3314        return;
3315    }
3316
3317    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3318        return;
3319    };
3320
3321    metadata.push((
3322        "_ttl_ms".to_string(),
3323        if default_ttl_ms <= i64::MAX as u64 {
3324            MetadataValue::Int(default_ttl_ms as i64)
3325        } else {
3326            MetadataValue::Timestamp(default_ttl_ms)
3327        },
3328    ));
3329}
3330
3331fn ensure_non_tree_reserved_metadata_entries(
3332    metadata: &[(String, MetadataValue)],
3333) -> RedDBResult<()> {
3334    for (key, _) in metadata {
3335        ensure_non_tree_reserved_metadata_key(key)?;
3336    }
3337    Ok(())
3338}
3339
3340fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3341    if key.starts_with(TREE_METADATA_PREFIX) {
3342        return Err(RedDBError::Query(format!(
3343            "metadata key '{}' is reserved for managed trees",
3344            key
3345        )));
3346    }
3347    Ok(())
3348}
3349
3350fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3351    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3352        return Err(RedDBError::Query(format!(
3353            "edge label '{}' is reserved for managed trees",
3354            TREE_CHILD_EDGE_LABEL
3355        )));
3356    }
3357    Ok(())
3358}
3359
3360fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3361    let mut columns = Vec::with_capacity(pairs.len());
3362    let mut values = Vec::with_capacity(pairs.len());
3363
3364    for (column, value) in pairs {
3365        columns.push(column.clone());
3366        values.push(value.clone());
3367    }
3368
3369    (columns, values)
3370}
3371
3372/// Find a required column value and return it as-is.
3373fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3374    for (i, col) in columns.iter().enumerate() {
3375        if col.eq_ignore_ascii_case(name) {
3376            return Ok(values[i].clone());
3377        }
3378    }
3379    Err(RedDBError::Query(format!(
3380        "required column '{name}' not found in INSERT"
3381    )))
3382}
3383
3384/// Find a required column value and coerce to String.
3385fn find_column_value_string(
3386    columns: &[String],
3387    values: &[Value],
3388    name: &str,
3389) -> RedDBResult<String> {
3390    let val = find_column_value(columns, values, name)?;
3391    match val {
3392        Value::Text(s) => Ok(s.to_string()),
3393        Value::Integer(n) => Ok(n.to_string()),
3394        Value::Float(n) => Ok(n.to_string()),
3395        other => Err(RedDBError::Query(format!(
3396            "column '{name}' expected text, got {other:?}"
3397        ))),
3398    }
3399}
3400
3401fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3402    let val = find_column_value(columns, values, name)?;
3403    match val {
3404        Value::Float(n) => Ok(n),
3405        Value::Integer(n) => Ok(n as f64),
3406        Value::UnsignedInteger(n) => Ok(n as f64),
3407        Value::Text(s) => s
3408            .parse::<f64>()
3409            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3410        other => Err(RedDBError::Query(format!(
3411            "column '{name}' expected number, got {other:?}"
3412        ))),
3413    }
3414}
3415
3416/// Find an optional column value as String.
3417fn find_column_value_opt_string(
3418    columns: &[String],
3419    values: &[Value],
3420    name: &str,
3421) -> Option<String> {
3422    for (i, col) in columns.iter().enumerate() {
3423        if col.eq_ignore_ascii_case(name) {
3424            return match &values[i] {
3425                Value::Null => None,
3426                Value::Text(s) => Some(s.to_string()),
3427                Value::Integer(n) => Some(n.to_string()),
3428                Value::Float(n) => Some(n.to_string()),
3429                _ => None,
3430            };
3431        }
3432    }
3433    None
3434}
3435
3436/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3437///
3438/// Accepts integer literals, decimal strings, and node labels resolved via
3439/// the per-collection graph label index (same source of truth that
3440/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3441/// labels error so callers can fall back to the numeric id form.
3442fn resolve_edge_endpoint(
3443    store: &crate::storage::unified::UnifiedStore,
3444    collection: &str,
3445    columns: &[String],
3446    values: &[Value],
3447    name: &str,
3448) -> RedDBResult<u64> {
3449    let val = find_column_value(columns, values, name)?;
3450    match val {
3451        Value::Integer(n) => Ok(n as u64),
3452        Value::UnsignedInteger(n) => Ok(n),
3453        Value::Text(s) => {
3454            if let Ok(n) = s.parse::<u64>() {
3455                return Ok(n);
3456            }
3457            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3458            match matches.len() {
3459                0 => Err(RedDBError::Query(format!(
3460                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3461                ))),
3462                1 => Ok(matches[0].raw()),
3463                n => Err(RedDBError::Query(format!(
3464                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3465                ))),
3466            }
3467        }
3468        other => Err(RedDBError::Query(format!(
3469            "column '{name}' expected integer or node label, got {other:?}"
3470        ))),
3471    }
3472}
3473
3474fn resolve_edge_endpoint_any(
3475    store: &crate::storage::unified::UnifiedStore,
3476    collection: &str,
3477    columns: &[String],
3478    values: &[Value],
3479    names: &[&str],
3480) -> RedDBResult<u64> {
3481    for name in names {
3482        if columns
3483            .iter()
3484            .any(|column| column.eq_ignore_ascii_case(name))
3485        {
3486            return resolve_edge_endpoint(store, collection, columns, values, name);
3487        }
3488    }
3489
3490    Err(RedDBError::Query(format!(
3491        "required column '{}' not found in INSERT",
3492        names.first().copied().unwrap_or("from_rid")
3493    )))
3494}
3495
3496/// Find a required column value and coerce to u64.
3497fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3498    let val = find_column_value(columns, values, name)?;
3499    match val {
3500        Value::Integer(n) => Ok(n as u64),
3501        Value::UnsignedInteger(n) => Ok(n),
3502        Value::Text(s) => s
3503            .parse::<u64>()
3504            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3505        other => Err(RedDBError::Query(format!(
3506            "column '{name}' expected integer, got {other:?}"
3507        ))),
3508    }
3509}
3510
3511/// Find an optional column value as f32.
3512fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3513    for (i, col) in columns.iter().enumerate() {
3514        if col.eq_ignore_ascii_case(name) {
3515            return match &values[i] {
3516                Value::Float(n) => Some(*n as f32),
3517                Value::Integer(n) => Some(*n as f32),
3518                Value::Null => None,
3519                _ => None,
3520            };
3521        }
3522    }
3523    None
3524}
3525
3526/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3527fn find_column_value_vec_f32(
3528    columns: &[String],
3529    values: &[Value],
3530    name: &str,
3531) -> RedDBResult<Vec<f32>> {
3532    let val = find_column_value(columns, values, name)?;
3533    match val {
3534        Value::Vector(v) => Ok(v),
3535        Value::Json(bytes) => {
3536            // Try to parse as JSON array of numbers
3537            let s = std::str::from_utf8(&bytes).map_err(|_| {
3538                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3539            })?;
3540            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3541                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3542            })?;
3543            Ok(arr)
3544        }
3545        other => Err(RedDBError::Query(format!(
3546            "column '{name}' expected vector, got {other:?}"
3547        ))),
3548    }
3549}
3550
3551fn find_column_value_vec_f32_any(
3552    columns: &[String],
3553    values: &[Value],
3554    names: &[&str],
3555) -> RedDBResult<Vec<f32>> {
3556    for name in names {
3557        if columns
3558            .iter()
3559            .any(|column| column.eq_ignore_ascii_case(name))
3560        {
3561            return find_column_value_vec_f32(columns, values, name);
3562        }
3563    }
3564    Err(RedDBError::Query(format!(
3565        "required vector column '{}' not found in INSERT",
3566        names.join("' or '")
3567    )))
3568}
3569
3570/// Extract remaining properties (all columns not in the exclusion list).
3571fn extract_remaining_properties(
3572    columns: &[String],
3573    values: &[Value],
3574    exclude: &[&str],
3575) -> Vec<(String, Value)> {
3576    columns
3577        .iter()
3578        .zip(values.iter())
3579        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3580        .map(|(col, val)| (col.clone(), val.clone()))
3581        .collect()
3582}
3583
3584fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3585    let mut invalid = Vec::new();
3586    for column in columns {
3587        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3588            invalid.push(column.clone());
3589        }
3590    }
3591
3592    if invalid.is_empty() {
3593        Ok(())
3594    } else {
3595        Err(RedDBError::Query(format!(
3596            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3597            invalid.join(", ")
3598        )))
3599    }
3600}
3601
3602fn is_timeseries_insert_column(column: &str) -> bool {
3603    matches!(
3604        column.to_ascii_lowercase().as_str(),
3605        "metric"
3606            | "value"
3607            | "tags"
3608            | "timestamp"
3609            | "timestamp_ns"
3610            | "time"
3611            // Analytics-event extension (#577): an analytics row carries
3612            // an `event_name` + JSON `payload`. The payload is validated
3613            // against the AnalyticsSchemaRegistry inside
3614            // `insert_timeseries_point` before the row lands.
3615            | "event_name"
3616            | "payload"
3617    )
3618}
3619
3620fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3621    let mut found = None;
3622
3623    for alias in ["timestamp_ns", "timestamp", "time"] {
3624        for (index, column) in columns.iter().enumerate() {
3625            if !column.eq_ignore_ascii_case(alias) {
3626                continue;
3627            }
3628
3629            if found.is_some() {
3630                return Err(RedDBError::Query(
3631                    "timeseries INSERT accepts only one timestamp column".to_string(),
3632                ));
3633            }
3634
3635            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3636        }
3637    }
3638
3639    Ok(found)
3640}
3641
3642fn find_timeseries_tags(
3643    columns: &[String],
3644    values: &[Value],
3645) -> RedDBResult<std::collections::HashMap<String, String>> {
3646    for (index, column) in columns.iter().enumerate() {
3647        if column.eq_ignore_ascii_case("tags") {
3648            return parse_timeseries_tags(&values[index]);
3649        }
3650    }
3651    Ok(std::collections::HashMap::new())
3652}
3653
3654fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3655    match value {
3656        Value::Null => Ok(std::collections::HashMap::new()),
3657        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3658        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3659        other => Err(RedDBError::Query(format!(
3660            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3661        ))),
3662    }
3663}
3664
3665fn parse_timeseries_tags_json(
3666    bytes: &[u8],
3667) -> RedDBResult<std::collections::HashMap<String, String>> {
3668    let json: crate::json::Value = crate::json::from_slice(bytes)
3669        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3670
3671    let object = match json {
3672        crate::json::Value::Object(object) => object,
3673        other => {
3674            return Err(RedDBError::Query(format!(
3675                "timeseries tags must be a JSON object, got {other:?}"
3676            )))
3677        }
3678    };
3679
3680    let mut tags = std::collections::HashMap::with_capacity(object.len());
3681    for (key, value) in object {
3682        tags.insert(key, json_tag_value_to_string(&value));
3683    }
3684    Ok(tags)
3685}
3686
3687/// Encode a tag value for storage so the original JSON type can be
3688/// recovered on read (issue #543).
3689///
3690/// Time-series tags are stored as `HashMap<String, String>` on the
3691/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3692/// the segment codec, WAL and gRPC mirrors don't need a new value
3693/// variant. To preserve the original JSON type across that
3694/// string-only channel we prepend the
3695/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3696/// and serialize the value as compact JSON text. The read paths
3697/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3698/// the marker, parse the suffix, and recover a real JSON value.
3699/// Tags written through other channels (Prometheus remote write,
3700/// metrics handlers, legacy on-disk data) lack the marker and are
3701/// returned as `JsonValue::String(raw)` exactly as before.
3702fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3703    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3704    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3705    buf.push_str(&value.to_string_compact());
3706    buf
3707}
3708
3709fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3710    match value {
3711        Value::UnsignedInteger(value) => Ok(*value),
3712        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3713        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3714        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3715            RedDBError::Query(format!(
3716                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3717            ))
3718        }),
3719        other => Err(RedDBError::Query(format!(
3720            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3721        ))),
3722    }
3723}
3724
3725fn current_unix_ns() -> u64 {
3726    std::time::SystemTime::now()
3727        .duration_since(std::time::UNIX_EPOCH)
3728        .unwrap_or_default()
3729        .as_nanos()
3730        .min(u128::from(u64::MAX)) as u64
3731}
3732
3733fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3734    use crate::json::{Map, Value as JV};
3735    match value {
3736        MetadataValue::Null => JV::Null,
3737        MetadataValue::Bool(value) => JV::Bool(*value),
3738        MetadataValue::Int(value) => JV::Number(*value as f64),
3739        MetadataValue::Float(value) => JV::Number(*value),
3740        MetadataValue::String(value) => JV::String(value.clone()),
3741        MetadataValue::Bytes(value) => JV::Array(
3742            value
3743                .iter()
3744                .map(|value| JV::Number(*value as f64))
3745                .collect(),
3746        ),
3747        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3748        MetadataValue::Array(values) => {
3749            JV::Array(values.iter().map(metadata_value_to_json).collect())
3750        }
3751        MetadataValue::Object(object) => {
3752            let entries = object
3753                .iter()
3754                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3755                .collect();
3756            JV::Object(entries)
3757        }
3758        MetadataValue::Geo { lat, lon } => {
3759            let mut object = Map::new();
3760            object.insert("lat".to_string(), JV::Number(*lat));
3761            object.insert("lon".to_string(), JV::Number(*lon));
3762            JV::Object(object)
3763        }
3764        MetadataValue::Reference(target) => {
3765            let mut object = Map::new();
3766            object.insert(
3767                "collection".to_string(),
3768                JV::String(target.collection().to_string()),
3769            );
3770            object.insert(
3771                "entity_id".to_string(),
3772                JV::Number(target.entity_id().raw() as f64),
3773            );
3774            JV::Object(object)
3775        }
3776        MetadataValue::References(values) => {
3777            let refs = values
3778                .iter()
3779                .map(|target| {
3780                    let mut object = Map::new();
3781                    object.insert(
3782                        "collection".to_string(),
3783                        JV::String(target.collection().to_string()),
3784                    );
3785                    object.insert(
3786                        "entity_id".to_string(),
3787                        JV::Number(target.entity_id().raw() as f64),
3788                    );
3789                    JV::Object(object)
3790                })
3791                .collect();
3792            JV::Array(refs)
3793        }
3794    }
3795}
3796
3797fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3798    match value {
3799        Value::Null => MetadataValue::Null,
3800        Value::Boolean(value) => MetadataValue::Bool(*value),
3801        Value::Integer(value) => MetadataValue::Int(*value),
3802        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3803        Value::Float(value) => MetadataValue::Float(*value),
3804        Value::Text(value) => MetadataValue::String(value.to_string()),
3805        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3806        Value::Timestamp(value) => {
3807            if *value >= 0 {
3808                metadata_u64_to_value(*value as u64)
3809            } else {
3810                MetadataValue::Int(*value)
3811            }
3812        }
3813        Value::TimestampMs(value) => {
3814            if *value >= 0 {
3815                metadata_u64_to_value(*value as u64)
3816            } else {
3817                MetadataValue::Int(*value)
3818            }
3819        }
3820        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3821        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3822        Value::Date(value) => MetadataValue::String(value.to_string()),
3823        Value::Time(value) => MetadataValue::String(value.to_string()),
3824        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3825        Value::Ipv4(value) => MetadataValue::String(format!(
3826            "{}.{}.{}.{}",
3827            (value >> 24) & 0xFF,
3828            (value >> 16) & 0xFF,
3829            (value >> 8) & 0xFF,
3830            value & 0xFF
3831        )),
3832        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3833        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3834        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3835        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3836            lat: *lat as f64 / 1_000_000.0,
3837            lon: *lon as f64 / 1_000_000.0,
3838        },
3839        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3840        Value::TableRef(value) => MetadataValue::String(value.clone()),
3841        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3842        Value::Password(value) => MetadataValue::String(value.clone()),
3843        Value::Array(values) => {
3844            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3845        }
3846        _ => MetadataValue::String(value.to_string()),
3847    }
3848}
3849
3850fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3851    match value {
3852        Value::Null => Ok(MetadataValue::Null),
3853        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3854        Value::Integer(_) => Err(RedDBError::Query(format!(
3855            "column '{field}' must be non-negative for TTL metadata"
3856        ))),
3857        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3858        Value::Float(value) if value.is_finite() => {
3859            if value.fract().abs() >= f64::EPSILON {
3860                return Err(RedDBError::Query(format!(
3861                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3862                )));
3863            }
3864            if *value < 0.0 {
3865                return Err(RedDBError::Query(format!(
3866                    "column '{field}' must be non-negative for TTL metadata"
3867                )));
3868            }
3869            if *value > u64::MAX as f64 {
3870                return Err(RedDBError::Query(format!(
3871                    "column '{field}' value is too large"
3872                )));
3873            }
3874            Ok(metadata_u64_to_value(*value as u64))
3875        }
3876        Value::Float(_) => Err(RedDBError::Query(format!(
3877            "column '{field}' must be a finite number"
3878        ))),
3879        Value::Text(value) => {
3880            let value = value.trim();
3881            if let Ok(value) = value.parse::<u64>() {
3882                Ok(metadata_u64_to_value(value))
3883            } else if let Ok(value) = value.parse::<i64>() {
3884                if value < 0 {
3885                    return Err(RedDBError::Query(format!(
3886                        "column '{field}' must be non-negative for TTL metadata"
3887                    )));
3888                }
3889                Ok(metadata_u64_to_value(value as u64))
3890            } else if let Ok(value) = value.parse::<f64>() {
3891                if !value.is_finite() {
3892                    return Err(RedDBError::Query(format!(
3893                        "column '{field}' must be a finite number"
3894                    )));
3895                }
3896                if value.fract().abs() >= f64::EPSILON {
3897                    return Err(RedDBError::Query(format!(
3898                        "column '{field}' must be an integer (TTL metadata must be an integer)"
3899                    )));
3900                }
3901                if value < 0.0 {
3902                    return Err(RedDBError::Query(format!(
3903                        "column '{field}' must be non-negative for TTL metadata"
3904                    )));
3905                }
3906                if value > u64::MAX as f64 {
3907                    return Err(RedDBError::Query(format!(
3908                        "column '{field}' value is too large"
3909                    )));
3910                }
3911                Ok(metadata_u64_to_value(value as u64))
3912            } else {
3913                Err(RedDBError::Query(format!(
3914                    "column '{field}' expects a numeric value for TTL metadata"
3915                )))
3916            }
3917        }
3918        _ => Err(RedDBError::Query(format!(
3919            "column '{field}' expects a numeric value for TTL metadata"
3920        ))),
3921    }
3922}
3923
3924fn metadata_u64_to_value(value: u64) -> MetadataValue {
3925    if value <= i64::MAX as u64 {
3926        MetadataValue::Int(value as i64)
3927    } else {
3928        MetadataValue::Timestamp(value)
3929    }
3930}
3931
3932/// Phase 2 PG parity: inspect a column value and return `true` when
3933/// the dotted `tail` path is already present under it. Used by the
3934/// tenant auto-fill so rows that already carry an explicit value
3935/// (bulk import, admin insert on behalf of a tenant) are not
3936/// double-stamped with the session's current_tenant().
3937fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3938    let json = match value {
3939        Value::Null => return false,
3940        Value::Json(bytes) | Value::Blob(bytes) => {
3941            match crate::json::from_slice::<crate::json::Value>(bytes) {
3942                Ok(v) => v,
3943                Err(_) => return false,
3944            }
3945        }
3946        Value::Text(s) => {
3947            let trimmed = s.trim_start();
3948            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3949                return false;
3950            }
3951            match crate::json::from_str::<crate::json::Value>(s) {
3952                Ok(v) => v,
3953                Err(_) => return false,
3954            }
3955        }
3956        _ => return false,
3957    };
3958    let mut cursor = &json;
3959    for seg in tail.split('.') {
3960        match cursor {
3961            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3962                Some((_, v)) => cursor = v,
3963                None => return false,
3964            },
3965            _ => return false,
3966        }
3967    }
3968    !matches!(cursor, crate::json::Value::Null)
3969}
3970
3971/// Phase 2 PG parity: take a column value (possibly Null / Text /
3972/// Json) and return a `Value::Json` with the dotted `tail` path set
3973/// to `tenant_id`. Preserves every pre-existing key.
3974///
3975/// Accepts:
3976/// * `Value::Null`  → fresh `{tail: tenant_id}` object
3977/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
3978/// * `Value::text(s)` if `s` is valid JSON → same as Json
3979/// * anything else → error (user supplied a scalar where we need
3980///   a JSON container)
3981fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3982    let mut root = match current {
3983        Value::Null => crate::json::Value::Object(Default::default()),
3984        Value::Json(bytes) | Value::Blob(bytes) => {
3985            crate::json::from_slice(&bytes).map_err(|err| {
3986                RedDBError::Query(format!(
3987                    "tenant auto-fill: root column is not valid JSON ({err})"
3988                ))
3989            })?
3990        }
3991        Value::Text(s) => {
3992            if s.trim().is_empty() {
3993                crate::json::Value::Object(Default::default())
3994            } else {
3995                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3996                    RedDBError::Query(format!(
3997                        "tenant auto-fill: text root is not valid JSON ({err})"
3998                    ))
3999                })?
4000            }
4001        }
4002        other => {
4003            return Err(RedDBError::Query(format!(
4004                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4005            )));
4006        }
4007    };
4008
4009    // Navigate path segments, creating intermediate objects on demand.
4010    let segments: Vec<&str> = tail.split('.').collect();
4011    let mut cursor: &mut crate::json::Value = &mut root;
4012    for (i, seg) in segments.iter().enumerate() {
4013        let is_last = i + 1 == segments.len();
4014        let map = match cursor {
4015            crate::json::Value::Object(m) => m,
4016            _ => {
4017                return Err(RedDBError::Query(format!(
4018                    "tenant auto-fill: segment '{seg}' is not inside an object"
4019                )));
4020            }
4021        };
4022        if is_last {
4023            map.insert(
4024                seg.to_string(),
4025                crate::json::Value::String(tenant_id.to_string()),
4026            );
4027            break;
4028        }
4029        cursor = map
4030            .entry(seg.to_string())
4031            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4032    }
4033
4034    let bytes = crate::json::to_vec(&root).map_err(|err| {
4035        RedDBError::Query(format!(
4036            "tenant auto-fill: failed to re-serialize JSON ({err})"
4037        ))
4038    })?;
4039    Ok(Value::Json(bytes))
4040}
4041
4042#[cfg(test)]
4043mod tests {
4044    use crate::storage::schema::Value;
4045    use crate::storage::wal::{WalReader, WalRecord};
4046    use crate::{RedDBOptions, RedDBRuntime};
4047    use std::path::Path;
4048
4049    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4050        WalReader::open(wal_path)
4051            .expect("wal opens")
4052            .iter()
4053            .map(|record| record.expect("wal record decodes").1)
4054            .filter_map(|record| match record {
4055                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4056                _ => None,
4057            })
4058            .collect()
4059    }
4060
4061    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4062        action
4063            .windows(needle.len())
4064            .any(|window| window == needle.as_bytes())
4065    }
4066
4067    fn assert_statement_writes_collections_in_one_new_wal_batch(
4068        rt: &RedDBRuntime,
4069        wal_path: &Path,
4070        statement: &str,
4071        source: &str,
4072        event_queue: &str,
4073    ) {
4074        let before_batches = store_commit_batches(wal_path).len();
4075
4076        rt.execute_query(statement).unwrap();
4077
4078        let batches = store_commit_batches(wal_path);
4079        let statement_batches = &batches[before_batches..];
4080        let source_batch = statement_batches
4081            .iter()
4082            .position(|actions| {
4083                actions.iter().any(|action| {
4084                    action_contains_text(action, source)
4085                        && !action_contains_text(action, event_queue)
4086                })
4087            })
4088            .expect("source collection write batch is present");
4089        let event_batch = statement_batches
4090            .iter()
4091            .position(|actions| {
4092                actions
4093                    .iter()
4094                    .any(|action| action_contains_text(action, event_queue))
4095            })
4096            .expect("event queue write batch is present");
4097
4098        assert_eq!(
4099            source_batch, event_batch,
4100            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4101        );
4102    }
4103
4104    #[test]
4105    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4106        let dir = tempfile::tempdir().unwrap();
4107        let db_path = dir.path().join("events_dual_write.rdb");
4108        let wal_path = db_path.with_extension("rdb-uwal");
4109        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4110
4111        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4112            .unwrap();
4113        assert_statement_writes_collections_in_one_new_wal_batch(
4114            &rt,
4115            &wal_path,
4116            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4117            "users",
4118            "users_events",
4119        );
4120    }
4121
4122    #[test]
4123    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4124        let dir = tempfile::tempdir().unwrap();
4125        let db_path = dir.path().join("events_update_atomic.rdb");
4126        let wal_path = db_path.with_extension("rdb-uwal");
4127        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4128
4129        rt.execute_query(
4130            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4131        )
4132        .unwrap();
4133        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4134            .unwrap();
4135
4136        assert_statement_writes_collections_in_one_new_wal_batch(
4137            &rt,
4138            &wal_path,
4139            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4140            "users",
4141            "user_updates",
4142        );
4143    }
4144
4145    #[test]
4146    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4147        let dir = tempfile::tempdir().unwrap();
4148        let db_path = dir.path().join("events_delete_atomic.rdb");
4149        let wal_path = db_path.with_extension("rdb-uwal");
4150        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4151
4152        rt.execute_query(
4153            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4154        )
4155        .unwrap();
4156        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4157            .unwrap();
4158
4159        assert_statement_writes_collections_in_one_new_wal_batch(
4160            &rt,
4161            &wal_path,
4162            "DELETE FROM users WHERE id = 1",
4163            "users",
4164            "user_deletes",
4165        );
4166    }
4167
4168    #[test]
4169    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4170        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4171        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4172            .unwrap();
4173        for id in 0..5 {
4174            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4175                .unwrap();
4176        }
4177        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4178            .unwrap();
4179
4180        let updated = rt
4181            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4182            .unwrap();
4183        assert_eq!(updated.affected_rows, 3);
4184
4185        let selected = rt
4186            .execute_query("SELECT id, score FROM users ORDER BY id")
4187            .unwrap();
4188        let scores: Vec<(i64, i64)> = selected
4189            .result
4190            .records
4191            .iter()
4192            .map(|record| {
4193                let id = match record.get("id").unwrap() {
4194                    Value::Integer(value) => *value,
4195                    other => panic!("expected integer id, got {other:?}"),
4196                };
4197                let score = match record.get("score").unwrap() {
4198                    Value::Integer(value) => *value,
4199                    other => panic!("expected integer score, got {other:?}"),
4200                };
4201                (id, score)
4202            })
4203            .collect();
4204        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4205    }
4206
4207    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4208    /// same code path DELETE uses (#51, #52). Exercises the indexed
4209    /// equality fast-path (WHERE id = N with a HASH index), the
4210    /// unindexed range scan (WHERE score > N), and the no-WHERE
4211    /// full-scan branch to confirm the extracted "find target rows"
4212    /// loop preserves affected-row counts and the resulting row state.
4213    #[test]
4214    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4215        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4216        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4217            .unwrap();
4218        for id in 0..5 {
4219            rt.execute_query(&format!(
4220                "INSERT INTO items (id, score) VALUES ({id}, {})",
4221                id * 10
4222            ))
4223            .unwrap();
4224        }
4225        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4226            .unwrap();
4227
4228        // Indexed equality UPDATE — hits the hash fast-path inside
4229        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4230        // below the score>25 cutoff so the next assertion stays clean.
4231        let updated_one = rt
4232            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4233            .unwrap();
4234        assert_eq!(updated_one.affected_rows, 1);
4235
4236        // Unindexed scan UPDATE — bumps everyone with score > 25,
4237        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4238        // zoned/full-scan branch.
4239        let updated_many = rt
4240            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4241            .unwrap();
4242        assert_eq!(updated_many.affected_rows, 2);
4243
4244        let snapshot = rt
4245            .execute_query("SELECT id, score FROM items ORDER BY id")
4246            .unwrap();
4247        let pairs: Vec<(i64, i64)> = snapshot
4248            .result
4249            .records
4250            .iter()
4251            .map(|record| {
4252                let id = match record.get("id").unwrap() {
4253                    Value::Integer(value) => *value,
4254                    other => panic!("expected integer id, got {other:?}"),
4255                };
4256                let score = match record.get("score").unwrap() {
4257                    Value::Integer(value) => *value,
4258                    other => panic!("expected integer score, got {other:?}"),
4259                };
4260                (id, score)
4261            })
4262            .collect();
4263        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4264
4265        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4266        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4267        assert_eq!(updated_all.affected_rows, 5);
4268        let after = rt
4269            .execute_query("SELECT score FROM items ORDER BY id")
4270            .unwrap();
4271        let scores: Vec<i64> = after
4272            .result
4273            .records
4274            .iter()
4275            .map(|record| match record.get("score").unwrap() {
4276                Value::Integer(value) => *value,
4277                other => panic!("expected integer score, got {other:?}"),
4278            })
4279            .collect();
4280        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4281    }
4282
4283    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4284    /// both the index fast-path (WHERE id = N with a HASH index) and
4285    /// the unindexed scan path (WHERE score > N) to confirm the
4286    /// extracted "find target rows" loop preserves the affected-row
4287    /// count and which rows survive.
4288    #[test]
4289    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4290        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4291        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4292            .unwrap();
4293        for id in 0..5 {
4294            rt.execute_query(&format!(
4295                "INSERT INTO items (id, score) VALUES ({id}, {})",
4296                id * 10
4297            ))
4298            .unwrap();
4299        }
4300        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4301            .unwrap();
4302
4303        // Indexed equality DELETE — hits the hash fast-path inside
4304        // DmlTargetScan::find_target_ids.
4305        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4306        assert_eq!(deleted_one.affected_rows, 1);
4307
4308        // Unindexed scan DELETE — drops everyone with score > 25,
4309        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4310        // zoned/full-scan branch.
4311        let deleted_many = rt
4312            .execute_query("DELETE FROM items WHERE score > 25")
4313            .unwrap();
4314        assert_eq!(deleted_many.affected_rows, 2);
4315
4316        let surviving = rt
4317            .execute_query("SELECT id FROM items ORDER BY id")
4318            .unwrap();
4319        let ids: Vec<i64> = surviving
4320            .result
4321            .records
4322            .iter()
4323            .map(|record| match record.get("id").unwrap() {
4324                Value::Integer(value) => *value,
4325                other => panic!("expected integer id, got {other:?}"),
4326            })
4327            .collect();
4328        assert_eq!(ids, vec![0, 1]);
4329
4330        // Sanity: full-scan DELETE with no WHERE clears the rest.
4331        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4332        assert_eq!(deleted_rest.affected_rows, 2);
4333        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4334        assert!(empty.result.records.is_empty());
4335    }
4336
4337    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4338    /// INSERT but reject UPDATE and DELETE with the documented
4339    /// operator-facing error strings. Drives all three DML verbs so
4340    /// the centralized gate is exercised end-to-end.
4341    #[test]
4342    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4343        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4344        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4345            .unwrap();
4346
4347        // INSERT must succeed — APPEND ONLY exists precisely to allow
4348        // appends. The gate should be a no-op for INSERT.
4349        let inserted = rt
4350            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4351            .unwrap();
4352        assert_eq!(inserted.affected_rows, 1);
4353
4354        // UPDATE is rejected with the gate's UPDATE-specific message.
4355        let update_err = rt
4356            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4357            .unwrap_err();
4358        let msg = format!("{update_err}");
4359        assert!(
4360            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4361            "expected UPDATE rejection message, got: {msg}"
4362        );
4363
4364        // DELETE is rejected with the gate's DELETE-specific message.
4365        let delete_err = rt
4366            .execute_query("DELETE FROM events WHERE id = 1")
4367            .unwrap_err();
4368        let msg = format!("{delete_err}");
4369        assert!(
4370            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4371            "expected DELETE rejection message, got: {msg}"
4372        );
4373
4374        // Row should still be present — neither rejected mutation
4375        // touched storage.
4376        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4377        assert_eq!(surviving.result.records.len(), 1);
4378    }
4379
4380    /// CollectionContract gate: tables without an APPEND ONLY contract
4381    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4382    /// is a true pass-through, not an accidental block.
4383    #[test]
4384    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4385        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4386        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4387            .unwrap();
4388
4389        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4390            .unwrap();
4391        let updated = rt
4392            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4393            .unwrap();
4394        assert_eq!(updated.affected_rows, 1);
4395        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4396        assert_eq!(deleted.affected_rows, 1);
4397    }
4398
4399    #[test]
4400    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4401        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4402        rt.execute_query(
4403            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4404        )
4405        .unwrap();
4406
4407        let inserted = rt
4408            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4409            .unwrap();
4410        assert_eq!(inserted.affected_rows, 1);
4411
4412        let events = queue_payloads(&rt, "audit_log");
4413        assert_eq!(events.len(), 1);
4414        let event = events[0].as_object().expect("event payload object");
4415        assert!(event
4416            .get("event_id")
4417            .and_then(crate::json::Value::as_str)
4418            .is_some_and(|value| !value.is_empty()));
4419        assert_eq!(
4420            event.get("op").and_then(crate::json::Value::as_str),
4421            Some("insert")
4422        );
4423        assert_eq!(
4424            event.get("collection").and_then(crate::json::Value::as_str),
4425            Some("users")
4426        );
4427        assert_eq!(
4428            event.get("id").and_then(crate::json::Value::as_u64),
4429            Some(7)
4430        );
4431        assert!(event
4432            .get("ts")
4433            .and_then(crate::json::Value::as_u64)
4434            .is_some());
4435        assert!(event
4436            .get("lsn")
4437            .and_then(crate::json::Value::as_u64)
4438            .is_some());
4439        assert!(matches!(
4440            event.get("tenant"),
4441            Some(crate::json::Value::Null)
4442        ));
4443        assert!(matches!(
4444            event.get("before"),
4445            Some(crate::json::Value::Null)
4446        ));
4447        let after = event
4448            .get("after")
4449            .and_then(crate::json::Value::as_object)
4450            .expect("after object");
4451        assert_eq!(
4452            after.get("id").and_then(crate::json::Value::as_u64),
4453            Some(7)
4454        );
4455        assert_eq!(
4456            after.get("email").and_then(crate::json::Value::as_str),
4457            Some("a@example.com")
4458        );
4459    }
4460
4461    #[test]
4462    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4463        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4464        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4465            .unwrap();
4466
4467        rt.execute_query(
4468            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4469        )
4470        .unwrap();
4471
4472        let events = queue_payloads(&rt, "users_events");
4473        assert_eq!(events.len(), 2);
4474        let mut previous_lsn = 0;
4475        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4476            let object = event.as_object().expect("event payload object");
4477            assert_eq!(
4478                object.get("op").and_then(crate::json::Value::as_str),
4479                Some("insert")
4480            );
4481            assert_eq!(
4482                object.get("id").and_then(crate::json::Value::as_u64),
4483                Some(expected_id)
4484            );
4485            let lsn = object
4486                .get("lsn")
4487                .and_then(crate::json::Value::as_u64)
4488                .expect("event lsn");
4489            assert!(
4490                lsn > previous_lsn,
4491                "event LSNs should increase in row order"
4492            );
4493            previous_lsn = lsn;
4494            let after = object
4495                .get("after")
4496                .and_then(crate::json::Value::as_object)
4497                .expect("after object");
4498            assert_eq!(
4499                after.get("id").and_then(crate::json::Value::as_u64),
4500                Some(expected_id)
4501            );
4502        }
4503    }
4504
4505    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4506        let result = rt
4507            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4508            .expect("peek queue");
4509        result
4510            .result
4511            .records
4512            .iter()
4513            .map(
4514                |record| match record.get("payload").expect("payload column") {
4515                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4516                    other => panic!("expected JSON queue payload, got {other:?}"),
4517                },
4518            )
4519            .collect()
4520    }
4521
4522    // ── #112: auto-index user `id` on first insert ─────────────────────
4523
4524    /// First insert into a fresh collection that carries a column named
4525    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4526    /// populate it transparently, and `WHERE id = N` lookups exercise
4527    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4528    ///
4529    /// This is the load-bearing acceptance test for #112 — without the
4530    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4531    /// fall through to a full segment scan (the 4× perf gap documented
4532    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4533    #[test]
4534    fn auto_index_id_fires_on_first_insert() {
4535        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4536        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4537            .unwrap();
4538
4539        // Pre-condition: no index on `id` yet.
4540        assert!(
4541            rt.index_store_ref()
4542                .find_index_for_column("bench_users", "id")
4543                .is_none(),
4544            "freshly created collection should not have an `id` index"
4545        );
4546
4547        // Single-row INSERT — drives `MutationEngine::append_one`.
4548        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4549            .unwrap();
4550
4551        // Post-condition: hash index registered on `id`.
4552        let registered = rt
4553            .index_store_ref()
4554            .find_index_for_column("bench_users", "id")
4555            .expect("auto-index hook should have registered idx_id on first insert");
4556        assert_eq!(registered.name, "idx_id");
4557        assert_eq!(registered.collection, "bench_users");
4558        assert_eq!(registered.columns, vec!["id".to_string()]);
4559        assert!(matches!(
4560            registered.method,
4561            super::super::index_store::IndexMethodKind::Hash
4562        ));
4563
4564        // Subsequent inserts populate the index; `WHERE id = N` should
4565        // resolve via the hash fast path and round-trip every row.
4566        for id in 2..=5 {
4567            rt.execute_query(&format!(
4568                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4569                id * 10
4570            ))
4571            .unwrap();
4572        }
4573        for id in 1..=5 {
4574            let result = rt
4575                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4576                .unwrap();
4577            assert_eq!(
4578                result.result.records.len(),
4579                1,
4580                "id={id} should match one row"
4581            );
4582        }
4583
4584        // Delete via the hash fast-path — exactly the bench scenario the
4585        // perf doc identified as the 4× regression. With the index
4586        // present, `find_target_ids` short-circuits before
4587        // `for_each_entity_zoned` runs.
4588        let deleted = rt
4589            .execute_query("DELETE FROM bench_users WHERE id = 3")
4590            .unwrap();
4591        assert_eq!(deleted.affected_rows, 1);
4592    }
4593
4594    /// Bulk INSERT (the multi-row VALUES path) drives
4595    /// `MutationEngine::append_batch`. The hook must fire there too —
4596    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4597    /// wire bulk INSERT) skip auto-indexing entirely.
4598    #[test]
4599    fn auto_index_id_fires_on_first_bulk_insert() {
4600        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4601        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4602            .unwrap();
4603
4604        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4605            .unwrap();
4606
4607        let registered = rt
4608            .index_store_ref()
4609            .find_index_for_column("bench_bulk", "id")
4610            .expect("auto-index hook should fire on first bulk insert");
4611        assert_eq!(registered.name, "idx_id");
4612
4613        // Every row populated via `index_entity_insert_batch`.
4614        for id in 1..=3 {
4615            let result = rt
4616                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4617                .unwrap();
4618            assert_eq!(result.result.records.len(), 1);
4619        }
4620    }
4621
4622    /// Hook is a no-op when the row carries no `id` column. Conservative
4623    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4624    /// don't trigger it.
4625    #[test]
4626    fn auto_index_id_skips_when_no_id_column() {
4627        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4628        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4629            .unwrap();
4630        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4631            .unwrap();
4632
4633        assert!(rt
4634            .index_store_ref()
4635            .find_index_for_column("plain", "id")
4636            .is_none());
4637        assert!(rt
4638            .index_store_ref()
4639            .find_index_for_column("plain", "uid")
4640            .is_none());
4641    }
4642
4643    /// Hook only fires once per collection. If an explicit
4644    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4645    /// detects it via `find_index_for_column` and does NOT clobber it
4646    /// with a HASH index on the next insert.
4647    #[test]
4648    fn auto_index_id_skips_when_index_already_exists() {
4649        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4650        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4651            .unwrap();
4652        // User-declared BTREE index on `id` before any insert.
4653        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4654            .unwrap();
4655        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4656            .unwrap();
4657
4658        let registered = rt
4659            .index_store_ref()
4660            .find_index_for_column("pre", "id")
4661            .expect("user index should still be there");
4662        assert_eq!(
4663            registered.name, "user_idx",
4664            "auto-index hook must not overwrite an existing index"
4665        );
4666    }
4667
4668    /// Implicit `idx_id` is reaped when the collection drops. The
4669    /// existing `execute_drop_table` walks `list_indices` and drops every
4670    /// entry — confirm the auto-created index participates.
4671    #[test]
4672    fn auto_index_id_dropped_with_collection() {
4673        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4674        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4675            .unwrap();
4676        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4677            .unwrap();
4678        assert!(rt
4679            .index_store_ref()
4680            .find_index_for_column("ephemeral", "id")
4681            .is_some());
4682
4683        rt.execute_query("DROP TABLE ephemeral").unwrap();
4684
4685        assert!(
4686            rt.index_store_ref()
4687                .find_index_for_column("ephemeral", "id")
4688                .is_none(),
4689            "implicit `idx_id` must be reaped when its collection drops"
4690        );
4691    }
4692
4693    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4694    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4695    /// off, first insert leaves the collection without an `id` index —
4696    /// DELETE/UPDATE fall back to the scan path.
4697    #[test]
4698    fn auto_index_id_disabled_by_config() {
4699        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4700        let rt = RedDBRuntime::with_options(opts).unwrap();
4701
4702        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4703            .unwrap();
4704        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4705            .unwrap();
4706
4707        assert!(
4708            rt.index_store_ref()
4709                .find_index_for_column("off", "id")
4710                .is_none(),
4711            "with auto_index_id=false, no implicit index should be created"
4712        );
4713    }
4714
4715    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4716
4717    #[test]
4718    fn update_single_row_emits_update_event() {
4719        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4720        rt.execute_query(
4721            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4722        )
4723        .unwrap();
4724        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4725            .unwrap();
4726
4727        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4728            .unwrap();
4729
4730        let events = queue_payloads(&rt, "audit_log");
4731        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4732        let event = events[0].as_object().expect("event payload object");
4733        assert_eq!(
4734            event.get("op").and_then(crate::json::Value::as_str),
4735            Some("update")
4736        );
4737        assert_eq!(
4738            event.get("collection").and_then(crate::json::Value::as_str),
4739            Some("users")
4740        );
4741        assert!(event
4742            .get("event_id")
4743            .and_then(crate::json::Value::as_str)
4744            .is_some_and(|v| !v.is_empty()));
4745        let before = event
4746            .get("before")
4747            .and_then(crate::json::Value::as_object)
4748            .expect("before must be an object");
4749        let after = event
4750            .get("after")
4751            .and_then(crate::json::Value::as_object)
4752            .expect("after must be an object");
4753        assert_eq!(
4754            before.get("name").and_then(crate::json::Value::as_str),
4755            Some("Alice"),
4756            "before.name should be the old value"
4757        );
4758        assert_eq!(
4759            after.get("name").and_then(crate::json::Value::as_str),
4760            Some("Bob"),
4761            "after.name should be the new value"
4762        );
4763    }
4764
4765    #[test]
4766    fn update_event_only_includes_changed_fields() {
4767        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4768        rt.execute_query(
4769            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4770        )
4771        .unwrap();
4772        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4773            .unwrap();
4774
4775        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4776            .unwrap();
4777
4778        let events = queue_payloads(&rt, "evts");
4779        assert_eq!(events.len(), 1);
4780        let event = events[0].as_object().unwrap();
4781        let before = event
4782            .get("before")
4783            .and_then(crate::json::Value::as_object)
4784            .unwrap();
4785        let after = event
4786            .get("after")
4787            .and_then(crate::json::Value::as_object)
4788            .unwrap();
4789        // Only changed field included.
4790        assert!(
4791            before.contains_key("name"),
4792            "before must include changed field"
4793        );
4794        assert!(
4795            after.contains_key("name"),
4796            "after must include changed field"
4797        );
4798        // Unchanged fields must not appear.
4799        assert!(
4800            !before.contains_key("email"),
4801            "before must not include unchanged email"
4802        );
4803        assert!(
4804            !after.contains_key("email"),
4805            "after must not include unchanged email"
4806        );
4807    }
4808
4809    #[test]
4810    fn multi_row_update_emits_one_event_per_row() {
4811        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4812        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4813            .unwrap();
4814        rt.execute_query(
4815            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4816        )
4817        .unwrap();
4818
4819        rt.execute_query("UPDATE items SET status = 'done'")
4820            .unwrap();
4821
4822        let events = queue_payloads(&rt, "evts");
4823        assert_eq!(events.len(), 3, "expected one update event per row");
4824        for event in &events {
4825            let obj = event.as_object().unwrap();
4826            assert_eq!(
4827                obj.get("op").and_then(crate::json::Value::as_str),
4828                Some("update")
4829            );
4830        }
4831    }
4832
4833    #[test]
4834    fn delete_single_row_emits_delete_event() {
4835        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4836        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4837            .unwrap();
4838        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4839            .unwrap();
4840
4841        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4842
4843        let events = queue_payloads(&rt, "del_log");
4844        assert_eq!(events.len(), 1);
4845        let event = events[0].as_object().expect("event payload object");
4846        assert_eq!(
4847            event.get("op").and_then(crate::json::Value::as_str),
4848            Some("delete")
4849        );
4850        assert_eq!(
4851            event.get("collection").and_then(crate::json::Value::as_str),
4852            Some("users")
4853        );
4854        assert!(event
4855            .get("event_id")
4856            .and_then(crate::json::Value::as_str)
4857            .is_some_and(|v| !v.is_empty()));
4858        let before = event
4859            .get("before")
4860            .and_then(crate::json::Value::as_object)
4861            .expect("before must be an object for delete");
4862        assert_eq!(
4863            before.get("id").and_then(crate::json::Value::as_u64),
4864            Some(42)
4865        );
4866        assert_eq!(
4867            before.get("name").and_then(crate::json::Value::as_str),
4868            Some("Alice")
4869        );
4870        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4871    }
4872
4873    #[test]
4874    fn multi_row_delete_emits_one_event_per_row() {
4875        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4876        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4877            .unwrap();
4878        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4879            .unwrap();
4880
4881        rt.execute_query("DELETE FROM items").unwrap();
4882
4883        let events = queue_payloads(&rt, "del_log");
4884        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4885        for event in &events {
4886            let obj = event.as_object().unwrap();
4887            assert_eq!(
4888                obj.get("op").and_then(crate::json::Value::as_str),
4889                Some("delete")
4890            );
4891            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4892        }
4893    }
4894
4895    #[test]
4896    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4897        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4898        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4899            .unwrap();
4900
4901        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4902            .unwrap();
4903        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4904
4905        let events = queue_payloads(&rt, "evts");
4906        assert!(
4907            events.is_empty(),
4908            "UPDATE-only filter must not emit INSERT or DELETE events"
4909        );
4910    }
4911
4912    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
4913
4914    #[test]
4915    fn suppress_events_on_insert_emits_no_events() {
4916        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4917        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4918            .unwrap();
4919
4920        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4921            .unwrap();
4922
4923        let events = queue_payloads(&rt, "evts");
4924        assert!(
4925            events.is_empty(),
4926            "SUPPRESS EVENTS must prevent INSERT events"
4927        );
4928    }
4929
4930    #[test]
4931    fn suppress_events_on_update_emits_no_events() {
4932        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4933        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4934            .unwrap();
4935        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4936            .unwrap();
4937        // drain the INSERT event
4938        let _ = queue_payloads(&rt, "evts");
4939        // Force pop to drain; simpler: just check new count after UPDATE
4940        rt.execute_query("QUEUE PURGE evts").unwrap();
4941
4942        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4943            .unwrap();
4944
4945        let events = queue_payloads(&rt, "evts");
4946        assert!(
4947            events.is_empty(),
4948            "SUPPRESS EVENTS must prevent UPDATE events"
4949        );
4950    }
4951
4952    #[test]
4953    fn suppress_events_on_delete_emits_no_events() {
4954        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4955        rt.execute_query(
4956            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4957        )
4958        .unwrap();
4959        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4960            .unwrap();
4961
4962        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4963            .unwrap();
4964
4965        let events = queue_payloads(&rt, "evts");
4966        assert!(
4967            events.is_empty(),
4968            "SUPPRESS EVENTS must prevent DELETE events"
4969        );
4970    }
4971
4972    #[test]
4973    fn normal_insert_after_suppress_still_emits() {
4974        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4975        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4976            .unwrap();
4977
4978        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4979            .unwrap();
4980        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4981            .unwrap();
4982
4983        let events = queue_payloads(&rt, "evts");
4984        assert_eq!(
4985            events.len(),
4986            1,
4987            "only the non-suppressed INSERT should emit"
4988        );
4989        assert_eq!(
4990            events[0].get("id").and_then(crate::json::Value::as_u64),
4991            Some(2)
4992        );
4993    }
4994}