Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
148    /// target table is tenant-scoped and the user's column list does
149    /// not already name the tenant column.
150    ///
151    /// Returns:
152    /// * `Ok(None)` — no injection needed (non-tenant table, or user
153    ///   supplied the column explicitly). Caller uses the original
154    ///   query unchanged.
155    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
156    ///   + literal value appended to every row.
157    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
158    ///   the current session. Fails loudly so callers don't produce
159    ///   rows that RLS would then hide on read.
160    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
161        let Some(tenant_col) = self.tenant_column(&query.table) else {
162            return Ok(None);
163        };
164        // User already named the column (literal match) — trust them.
165        if query
166            .columns
167            .iter()
168            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
169        {
170            return Ok(None);
171        }
172
173        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
174        // nested key like `headers.tenant` we operate on the root
175        // column (`headers`) and set / add the nested path inside its
176        // JSON value. If the user named the root column we mutate in
177        // place; otherwise we create a fresh JSON column for every row.
178        if let Some(dot_pos) = tenant_col.find('.') {
179            let (root, tail) = tenant_col.split_at(dot_pos);
180            let tail = &tail[1..]; // drop leading '.'
181            return self.inject_dotted_tenant(query, root, tail);
182        }
183
184        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
185            return Err(RedDBError::Query(format!(
186                "INSERT into tenant-scoped table '{}' requires an active tenant — \
187                 run SET TENANT '<id>' first or name column '{}' explicitly",
188                query.table, tenant_col
189            )));
190        };
191
192        let mut augmented = query.clone();
193        augmented.columns.push(tenant_col);
194        let lit = Value::text(tenant_id.clone());
195        for row in augmented.values.iter_mut() {
196            row.push(lit.clone());
197        }
198        for row in augmented.value_exprs.iter_mut() {
199            row.push(crate::storage::query::ast::Expr::Literal {
200                value: lit.clone(),
201                span: crate::storage::query::ast::Span::synthetic(),
202            });
203        }
204        Ok(Some(augmented))
205    }
206
207    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
208    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
209    /// nested JSON instead of appending a flat column.
210    ///
211    /// Cases:
212    /// * Root column already in the INSERT list → mutate per-row JSON
213    ///   (parse, set path, re-serialize).
214    /// * Root column absent → create a fresh `{tail: tenant}` JSON
215    ///   object and append the root column to the INSERT.
216    fn inject_dotted_tenant(
217        &self,
218        query: &InsertQuery,
219        root: &str,
220        tail: &str,
221    ) -> RedDBResult<Option<InsertQuery>> {
222        let active_tenant = crate::runtime::impl_core::current_tenant();
223        let mut augmented = query.clone();
224        let root_idx = augmented
225            .columns
226            .iter()
227            .position(|c| c.eq_ignore_ascii_case(root));
228
229        if let Some(idx) = root_idx {
230            // User supplied the root column. Per-row: if the dotted
231            // tail is already present we trust the user (admin / bulk
232            // loader scenario); otherwise fill from the active
233            // tenant. An unbound tenant is only an error when some
234            // row actually needs filling.
235            for row in augmented.values.iter_mut() {
236                let Some(slot) = row.get_mut(idx) else {
237                    continue;
238                };
239                if dotted_tail_already_set(slot, tail) {
240                    continue;
241                }
242                let Some(tenant_id) = &active_tenant else {
243                    return Err(RedDBError::Query(format!(
244                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
245                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
246                        query.table, root, tail
247                    )));
248                };
249                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
250            }
251            // Expression row is kept in sync by re-wrapping the
252            // mutated literal; the canonical path will re-evaluate
253            // against the same JSON shape.
254            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
255                if let Some(slot) = row.get_mut(idx) {
256                    let new_value = augmented
257                        .values
258                        .get(row_idx)
259                        .and_then(|v| v.get(idx))
260                        .cloned()
261                        .unwrap_or(Value::Null);
262                    *slot = crate::storage::query::ast::Expr::Literal {
263                        value: new_value,
264                        span: crate::storage::query::ast::Span::synthetic(),
265                    };
266                }
267            }
268        } else {
269            // No root column in the INSERT list — auto-fill needs a
270            // bound tenant to synthesise one. Error loud so we never
271            // create a tenant-less row that RLS would then hide.
272            let Some(tenant_id) = &active_tenant else {
273                return Err(RedDBError::Query(format!(
274                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
276                    query.table, root, tail
277                )));
278            };
279            // Create a fresh JSON column with only the tenant path set.
280            augmented.columns.push(root.to_string());
281            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
282            for row in augmented.values.iter_mut() {
283                row.push(fresh.clone());
284            }
285            for row in augmented.value_exprs.iter_mut() {
286                row.push(crate::storage::query::ast::Expr::Literal {
287                    value: fresh.clone(),
288                    span: crate::storage::query::ast::Span::synthetic(),
289                });
290            }
291        }
292
293        Ok(Some(augmented))
294    }
295
296    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
297    /// `lsns` is empty because events fire at commit time.
298    fn delete_entities_batch(
299        &self,
300        collection: &str,
301        ids: &[EntityId],
302    ) -> RedDBResult<(u64, Vec<u64>)> {
303        if ids.is_empty() {
304            return Ok((0, vec![]));
305        }
306
307        let store = self.db().store();
308        let Some(manager) = store.get_collection(collection) else {
309            return Ok((0, vec![]));
310        };
311
312        let active_xid = self.current_xid();
313        let conn_id = crate::runtime::impl_core::current_connection_id();
314        let mut autocommit_xid = None;
315        let mut tombstoned_ids = Vec::new();
316        let mut tombstoned_entities = Vec::new();
317        let mut physical_delete_ids = Vec::new();
318        let table_row_resolver =
319            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
320
321        for &id in ids {
322            let Some(mut entity) = manager.get(id) else {
323                continue;
324            };
325            if matches!(entity.data, EntityData::Row(_)) {
326                let previous_xmax = entity.xmax;
327                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
328                    if table_row_resolver.resolve_candidate(&entity).is_none() {
329                        continue;
330                    }
331                } else if entity.xmax != 0 {
332                    continue;
333                }
334
335                let xid = match active_xid {
336                    Some(xid) => xid,
337                    None => match autocommit_xid {
338                        Some(xid) => xid,
339                        None => {
340                            let mgr = self.snapshot_manager();
341                            let xid = mgr.begin();
342                            autocommit_xid = Some(xid);
343                            xid
344                        }
345                    },
346                };
347                entity.set_xmax(xid);
348                if manager.update(entity.clone()).is_ok() {
349                    if active_xid.is_some() {
350                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
351                    }
352                    tombstoned_entities.push(entity);
353                    tombstoned_ids.push(id);
354                }
355            } else {
356                physical_delete_ids.push(id);
357            }
358        }
359
360        if let Some(xid) = autocommit_xid {
361            self.snapshot_manager().commit(xid);
362        }
363
364        let mut affected = tombstoned_ids.len() as u64;
365        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
366        if active_xid.is_some() {
367            store
368                .persist_entities_to_pager(collection, &tombstoned_entities)
369                .map_err(|err| RedDBError::Internal(err.to_string()))?;
370        } else {
371            store
372                .persist_entities_to_pager(collection, &tombstoned_entities)
373                .map_err(|err| RedDBError::Internal(err.to_string()))?;
374            for id in &tombstoned_ids {
375                store.context_index().remove_entity(*id);
376                let lsn = self.cdc_emit(
377                    crate::replication::cdc::ChangeOperation::Delete,
378                    collection,
379                    id.raw(),
380                    "entity",
381                );
382                lsns.push(lsn);
383            }
384        }
385
386        let deleted_ids = store
387            .delete_batch(collection, &physical_delete_ids)
388            .map_err(|err| RedDBError::Internal(err.to_string()))?;
389        affected += deleted_ids.len() as u64;
390        for id in &deleted_ids {
391            store.context_index().remove_entity(*id);
392            let lsn = self.cdc_emit(
393                crate::replication::cdc::ChangeOperation::Delete,
394                collection,
395                id.raw(),
396                "entity",
397            );
398            lsns.push(lsn);
399        }
400
401        Ok((affected, lsns))
402    }
403
404    /// Flushes context-index updates and CDC for each applied mutation.
405    /// Returns one LSN per entity in the same order as `applied`.
406    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
407        if applied.is_empty() {
408            return Ok(Vec::new());
409        }
410
411        let store = self.db().store();
412        if applied.iter().any(|item| item.context_index_dirty) {
413            store.context_index().index_entities(
414                &applied[0].collection,
415                applied
416                    .iter()
417                    .filter(|item| item.context_index_dirty)
418                    .map(|item| &item.entity),
419            );
420        }
421
422        for item in applied {
423            self.refresh_update_secondary_indexes(item)?;
424        }
425
426        let mut lsns = Vec::with_capacity(applied.len());
427        for item in applied {
428            let lsn = self.cdc_emit_prebuilt(
429                crate::replication::cdc::ChangeOperation::Update,
430                &item.collection,
431                &item.entity,
432                update_cdc_item_kind(self, &item.collection, &item.entity),
433                item.metadata.as_ref(),
434                false,
435            );
436            lsns.push(lsn);
437        }
438        Ok(lsns)
439    }
440
441    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
442        self.persist_applied_entity_mutations(applied)
443    }
444
445    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
446        if applied.pre_mutation_fields.is_empty() {
447            return Ok(());
448        }
449        let post = entity_row_fields_snapshot(&applied.entity);
450        if post.is_empty() {
451            return Ok(());
452        }
453
454        let indexed_cols = self
455            .index_store_ref()
456            .indexed_columns_set(&applied.collection);
457        if indexed_cols.is_empty() {
458            return Ok(());
459        }
460
461        if let Some(old_version) = applied.replaced_entity.as_ref() {
462            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
463                .pre_mutation_fields
464                .iter()
465                .filter(|(col, _)| indexed_cols.contains(col))
466                .cloned()
467                .collect();
468            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
469                .iter()
470                .filter(|(col, _)| indexed_cols.contains(col))
471                .cloned()
472                .collect();
473            if !old_index_fields.is_empty() {
474                self.index_store_ref()
475                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
476                    .map_err(crate::RedDBError::Internal)?;
477            }
478            if !new_index_fields.is_empty() {
479                self.index_store_ref()
480                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
481                    .map_err(crate::RedDBError::Internal)?;
482            }
483            return Ok(());
484        }
485
486        let damage =
487            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
488        if damage
489            .touched_columns()
490            .into_iter()
491            .any(|col| indexed_cols.contains(col))
492        {
493            self.index_store_ref()
494                .index_entity_update(
495                    &applied.collection,
496                    applied.id,
497                    &applied.pre_mutation_fields,
498                    &post,
499                )
500                .map_err(crate::RedDBError::Internal)?;
501        }
502        Ok(())
503    }
504
505    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
506    ///
507    /// Each row in `query.values` is zipped with `query.columns` to produce a
508    /// set of named fields, which is then dispatched based on entity_type.
509    pub fn execute_insert(
510        &self,
511        raw_query: &str,
512        query: &InsertQuery,
513    ) -> RedDBResult<RuntimeQueryResult> {
514        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
515        // CollectionContract gate (#49): single entry point for the
516        // operator's collection-level write rules. Today this is a
517        // no-op for INSERT (APPEND ONLY permits insert); routing
518        // through the gate now means future contract bits — versioned,
519        // vault-only writes — plug in once instead of per verb.
520        crate::runtime::collection_contract::CollectionContractGate::check(
521            self,
522            &query.table,
523            crate::runtime::collection_contract::MutationKind::Insert,
524        )?;
525        // Phase 2.5.4 table-scoped tenancy: if the target table is
526        // tenant-scoped and the user didn't name the tenant column,
527        // auto-inject it with the thread-local `CURRENT_TENANT()`
528        // value. When the column is named explicitly we trust the
529        // caller (useful for admin tooling that writes on behalf of
530        // specific tenants). An unbound tenant on an implicit-fill
531        // path errors up front rather than producing a row the RLS
532        // policy would silently hide.
533        let augmented_owned;
534        let query = match self.maybe_inject_tenant_column(query)? {
535            Some(new_q) => {
536                augmented_owned = new_q;
537                &augmented_owned
538            }
539            None => query,
540        };
541        self.check_insert_column_policy(query)?;
542
543        let mut inserted_count: u64 = 0;
544        let effective_rows =
545            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
546
547        // Ensure the collection exists (auto-create on first insert).
548        let store = self.inner.db.store();
549        let _ = store.get_or_create_collection(&query.table);
550        let declared_model = self
551            .db()
552            .collection_contract_arc(&query.table)
553            .map(|contract| contract.declared_model);
554
555        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
556            if query.returning.is_some() {
557                Some(Vec::with_capacity(effective_rows.len()))
558            } else {
559                None
560            };
561        let mut returning_result: Option<UnifiedResult> = None;
562
563        if matches!(query.entity_type, InsertEntityType::Row)
564            && !matches!(
565                declared_model,
566                Some(crate::catalog::CollectionModel::TimeSeries)
567            )
568        {
569            // Issue #523 + #524: blockchain collections seal each row into the
570            // chain. When the caller omits the reserved columns, the engine
571            // auto-fills (#523). When the caller supplies any reserved column,
572            // the values are validated against the current tip and a mismatch
573            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
574            //
575            // The whole batch runs under a per-collection chain lock so two
576            // concurrent submitters can't both bind to the same prev_hash —
577            // the loser observes the advanced tip and gets 409 with the new
578            // tip so it can retry.
579            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
580            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
581                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
582            } else {
583                None
584            };
585            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
586
587            // Issue #525 — refuse new blocks if the chain has been marked
588            // `integrity = broken` until an admin clears the flag.
589            if chain_mode && self.is_chain_integrity_broken(&query.table) {
590                return Err(RedDBError::InvalidOperation(format!(
591                    "ChainIntegrityBroken: collection '{}' is locked until \
592                     POST /collections/{}/clear-integrity-flag is called by an admin",
593                    query.table, query.table
594                )));
595            }
596
597            // Pull the tip from the in-memory cache; fall back to a one-time
598            // scan if the cache hasn't seen this collection yet (cold start
599            // after restart). Cache is updated below as rows are sealed.
600            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
601                if chain_mode {
602                    let mut cache = self.inner.chain_tip_cache.lock();
603                    if let Some(existing) = cache.get(&query.table) {
604                        Some(existing.clone())
605                    } else if let Some(scanned) =
606                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
607                    {
608                        cache.insert(query.table.clone(), scanned.clone());
609                        Some(scanned)
610                    } else {
611                        None
612                    }
613                } else {
614                    None
615                };
616
617            let mut rows = Vec::with_capacity(effective_rows.len());
618            for row_values in &effective_rows {
619                if row_values.len() != query.columns.len() {
620                    return Err(RedDBError::Query(format!(
621                        "INSERT column count ({}) does not match value count ({})",
622                        query.columns.len(),
623                        row_values.len()
624                    )));
625                }
626                let (mut fields, mut metadata) =
627                    split_insert_metadata(self, &query.columns, row_values)?;
628                if chain_mode {
629                    use crate::runtime::blockchain_kind::{
630                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
631                        COL_TIMESTAMP, RESERVED_COLUMNS,
632                    };
633                    let supplied_height = fields
634                        .iter()
635                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
636                        .map(|(_, v)| v.clone());
637                    let supplied_prev = fields
638                        .iter()
639                        .find(|(k, _)| k == COL_PREV_HASH)
640                        .map(|(_, v)| v.clone());
641                    let supplied_ts = fields
642                        .iter()
643                        .find(|(k, _)| k == COL_TIMESTAMP)
644                        .map(|(_, v)| v.clone());
645                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
646                    let user_supplied_any = supplied_height.is_some()
647                        || supplied_prev.is_some()
648                        || supplied_ts.is_some()
649                        || supplied_hash;
650
651                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
652                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
653
654                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
655                        Some(t) => (t.hash, t.height + 1),
656                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
657                    };
658                    let server_now = crate::runtime::blockchain_kind::now_ms();
659
660                    let (use_prev, use_height, use_ts) = if user_supplied_any {
661                        // Caller is participating in the chain protocol —
662                        // every field must be supplied AND match the tip.
663                        if supplied_hash {
664                            return Err(chain_conflict_error(
665                                tip_next_height.saturating_sub(1),
666                                tip_prev_hash,
667                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
668                                server_now,
669                                "hash column is engine-computed and cannot be supplied",
670                            ));
671                        }
672                        let caller_prev = match &supplied_prev {
673                            Some(Value::Blob(b)) if b.len() == 32 => {
674                                let mut a = [0u8; 32];
675                                a.copy_from_slice(b);
676                                a
677                            }
678                            Some(Value::Text(s)) if s.len() == 64 => {
679                                // Accept hex-encoded prev_hash so JSON / SQL
680                                // callers without literal-blob syntax can
681                                // still participate in the chain protocol.
682                                let mut a = [0u8; 32];
683                                let mut ok = true;
684                                for (i, slot) in a.iter_mut().enumerate() {
685                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
686                                    match u8::from_str_radix(pair, 16) {
687                                        Ok(byte) => *slot = byte,
688                                        Err(_) => {
689                                            ok = false;
690                                            break;
691                                        }
692                                    }
693                                }
694                                if !ok {
695                                    return Err(chain_conflict_error(
696                                        tip_next_height.saturating_sub(1),
697                                        tip_prev_hash,
698                                        chain_tip_full
699                                            .as_ref()
700                                            .map(|t| t.timestamp_ms)
701                                            .unwrap_or(0),
702                                        server_now,
703                                        "prev_hash is not valid hex",
704                                    ));
705                                }
706                                a
707                            }
708                            _ => {
709                                return Err(chain_conflict_error(
710                                    tip_next_height.saturating_sub(1),
711                                    tip_prev_hash,
712                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
713                                    server_now,
714                                    "prev_hash missing or not a 32-byte Blob",
715                                ));
716                            }
717                        };
718                        if caller_prev != tip_prev_hash {
719                            return Err(chain_conflict_error(
720                                tip_next_height.saturating_sub(1),
721                                tip_prev_hash,
722                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
723                                server_now,
724                                "prev_hash does not match current tip",
725                            ));
726                        }
727                        let caller_height = match &supplied_height {
728                            Some(Value::UnsignedInteger(v)) => *v,
729                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
730                            _ => {
731                                return Err(chain_conflict_error(
732                                    tip_next_height.saturating_sub(1),
733                                    tip_prev_hash,
734                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
735                                    server_now,
736                                    "block_height missing or not an unsigned integer",
737                                ));
738                            }
739                        };
740                        if caller_height != tip_next_height {
741                            return Err(chain_conflict_error(
742                                tip_next_height.saturating_sub(1),
743                                tip_prev_hash,
744                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
745                                server_now,
746                                "block_height does not match tip+1",
747                            ));
748                        }
749                        let caller_ts = match &supplied_ts {
750                            Some(Value::UnsignedInteger(v)) => *v,
751                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
752                            _ => {
753                                return Err(chain_conflict_error(
754                                    tip_next_height.saturating_sub(1),
755                                    tip_prev_hash,
756                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
757                                    server_now,
758                                    "timestamp missing or not an unsigned integer",
759                                ));
760                            }
761                        };
762                        let drift = (caller_ts as i128) - (server_now as i128);
763                        if drift.abs() > 60_000 {
764                            return Err(chain_conflict_error(
765                                tip_next_height.saturating_sub(1),
766                                tip_prev_hash,
767                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
768                                server_now,
769                                "timestamp outside ±60s of server_time",
770                            ));
771                        }
772                        (caller_prev, caller_height, caller_ts)
773                    } else {
774                        (tip_prev_hash, tip_next_height, server_now)
775                    };
776
777                    let (reserved, new_hash) =
778                        crate::runtime::blockchain_kind::make_block_reserved_fields(
779                            use_prev, use_height, use_ts, &payload,
780                        );
781                    fields.extend(reserved);
782                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
783                        height: use_height,
784                        hash: new_hash,
785                        timestamp_ms: use_ts,
786                    });
787                }
788                // Issue #522 — signed-writes verification. On collections
789                // created with `SIGNED_BY (...)` the row must carry valid
790                // `signer_pubkey` + `signature` reserved columns. Runs
791                // after chain_mode so canonical payload covers user-supplied
792                // fields only (blockchain reserved columns are filtered by
793                // `canonical_payload`; the two signed-writes reserved
794                // columns are split out before payload computation, then
795                // re-attached for storage). The blockchain + SIGNED_BY
796                // composition is owned by issue #526; we keep #522 to the
797                // non-chain path and let chain_mode collections punt to that
798                // slice rather than half-wire it here.
799                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
800                    let (pk_col, sig_col, residual) =
801                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
802                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
803                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
804                    crate::runtime::signed_writes_kind::verify_row(
805                        &reg,
806                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
807                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
808                        &payload,
809                    )
810                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
811                    fields = residual;
812                    // Round-trip the reserved columns with the value
813                    // type the caller supplied (Text/hex on the SQL path,
814                    // Blob on the binary path). Keeps SELECT and WHERE
815                    // predicates symmetric with the INSERT shape.
816                    if let Some(col) = pk_col {
817                        fields.push((
818                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
819                            col.raw_value,
820                        ));
821                    }
822                    if let Some(col) = sig_col {
823                        fields.push((
824                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
825                            col.raw_value,
826                        ));
827                    }
828                }
829                merge_with_clauses(
830                    &mut metadata,
831                    query.ttl_ms,
832                    query.expires_at_ms,
833                    &query.with_metadata,
834                );
835                if let Some(snaps) = returning_snapshots.as_mut() {
836                    snaps.push(fields.clone());
837                }
838                rows.push(CreateRowInput {
839                    collection: query.table.clone(),
840                    fields,
841                    metadata,
842                    node_links: Vec::new(),
843                    vector_links: Vec::new(),
844                });
845            }
846            let outputs = self.create_rows_batch(CreateRowsBatchInput {
847                collection: query.table.clone(),
848                rows,
849                suppress_events: query.suppress_events,
850            })?;
851            inserted_count = outputs.len() as u64;
852
853            // Chain mode: commit the new tip to the in-memory cache only after
854            // the batch persisted successfully. If the batch threw mid-way the
855            // cache stays on the previous tip and the chain lock releases.
856            if chain_mode {
857                if let Some(new_tip) = chain_tip_full.as_ref() {
858                    self.inner
859                        .chain_tip_cache
860                        .lock()
861                        .insert(query.table.clone(), new_tip.clone());
862                }
863            }
864
865            // Hypertable chunk routing: if this table was declared via
866            // CREATE HYPERTABLE, register each row's time-column value
867            // with the registry so chunk metadata (bounds, row counts,
868            // TTL eligibility) stays current. This is what lets
869            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
870            // retention daemon sweep expired chunks without scanning
871            // every row.
872            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
873                let time_col = &spec.time_column;
874                // Find the column's index in the INSERT column list.
875                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
876                    for row in &effective_rows {
877                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
878                            if *n >= 0 {
879                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
880                            }
881                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
882                            let _ = self.inner.db.hypertables().route(&query.table, *n);
883                        }
884                    }
885                }
886            }
887
888            if let (Some(items), Some(snaps)) =
889                (query.returning.as_ref(), returning_snapshots.take())
890            {
891                let snaps = row_insert_returning_snapshots(&outputs, snaps);
892                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
893            }
894        } else {
895            // Issue #419: surface the inserted entity id on every INSERT path.
896            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
897            // so a RETURNING clause (and the unconditional inserted_ids list,
898            // below) can expose the engine-assigned id. TimeSeries (the row
899            // branch in this else) still returns the not-supported error
900            // because create_timeseries_point isn't plumbed through this fn.
901            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
902                Vec::with_capacity(effective_rows.len());
903            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
904            {
905                Vec::with_capacity(effective_rows.len())
906            } else {
907                Vec::new()
908            };
909            if matches!(
910                query.entity_type,
911                InsertEntityType::Node | InsertEntityType::Edge
912            ) {
913                enum PreparedGraphInsert {
914                    Node {
915                        fields: Vec<(String, Value)>,
916                        input: CreateNodeInput,
917                    },
918                    Edge {
919                        fields: Vec<(String, Value)>,
920                        input: CreateEdgeInput,
921                    },
922                }
923
924                let mut prepared = Vec::with_capacity(effective_rows.len());
925                for row_values in &effective_rows {
926                    if row_values.len() != query.columns.len() {
927                        return Err(RedDBError::Query(format!(
928                            "INSERT column count ({}) does not match value count ({})",
929                            query.columns.len(),
930                            row_values.len()
931                        )));
932                    }
933
934                    match query.entity_type {
935                        InsertEntityType::Node => {
936                            let (node_values, mut metadata) =
937                                split_insert_metadata(self, &query.columns, row_values)?;
938                            merge_with_clauses(
939                                &mut metadata,
940                                query.ttl_ms,
941                                query.expires_at_ms,
942                                &query.with_metadata,
943                            );
944                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
945                            apply_collection_default_ttl_metadata(
946                                self,
947                                &query.table,
948                                &mut metadata,
949                            );
950                            let (columns, values) = pairwise_columns_values(&node_values);
951                            let label = find_column_value_string(&columns, &values, "label")?;
952                            let node_type =
953                                find_column_value_opt_string(&columns, &values, "node_type");
954                            let properties = extract_remaining_properties(
955                                &columns,
956                                &values,
957                                &["label", "node_type"],
958                            );
959                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
960                                properties.iter().map(|(key, _)| key.as_str()),
961                                &format!("node '{}'", query.table),
962                            )?;
963                            prepared.push(PreparedGraphInsert::Node {
964                                fields: node_values,
965                                input: CreateNodeInput {
966                                    collection: query.table.clone(),
967                                    label,
968                                    node_type,
969                                    properties,
970                                    metadata,
971                                    embeddings: Vec::new(),
972                                    table_links: Vec::new(),
973                                    node_links: Vec::new(),
974                                },
975                            });
976                        }
977                        InsertEntityType::Edge => {
978                            let (edge_values, mut metadata) =
979                                split_insert_metadata(self, &query.columns, row_values)?;
980                            merge_with_clauses(
981                                &mut metadata,
982                                query.ttl_ms,
983                                query.expires_at_ms,
984                                &query.with_metadata,
985                            );
986                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
987                            apply_collection_default_ttl_metadata(
988                                self,
989                                &query.table,
990                                &mut metadata,
991                            );
992                            let (columns, values) = pairwise_columns_values(&edge_values);
993                            let label = find_column_value_string(&columns, &values, "label")?;
994                            ensure_non_tree_structural_edge_label(&label)?;
995                            let from_id = resolve_edge_endpoint_any(
996                                self.inner.db.store().as_ref(),
997                                &query.table,
998                                &columns,
999                                &values,
1000                                &["from_rid", "from"],
1001                            )?;
1002                            let to_id = resolve_edge_endpoint_any(
1003                                self.inner.db.store().as_ref(),
1004                                &query.table,
1005                                &columns,
1006                                &values,
1007                                &["to_rid", "to"],
1008                            )?;
1009                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1010                            let properties = extract_remaining_properties(
1011                                &columns,
1012                                &values,
1013                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1014                            );
1015                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1016                                properties.iter().map(|(key, _)| key.as_str()),
1017                                &format!("edge '{}'", query.table),
1018                            )?;
1019                            prepared.push(PreparedGraphInsert::Edge {
1020                                fields: edge_values,
1021                                input: CreateEdgeInput {
1022                                    collection: query.table.clone(),
1023                                    label,
1024                                    from: EntityId::new(from_id),
1025                                    to: EntityId::new(to_id),
1026                                    weight,
1027                                    properties,
1028                                    metadata,
1029                                },
1030                            });
1031                        }
1032                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1033                    }
1034                }
1035
1036                ensure_graph_insert_contract(self, &query.table)?;
1037                let mut batch = self.inner.db.batch();
1038                for item in prepared {
1039                    match item {
1040                        PreparedGraphInsert::Node { fields, input } => {
1041                            if query.returning.is_some() {
1042                                returning_field_snaps.push(fields);
1043                            }
1044                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1045                            batch = batch.add_node_with_type(
1046                                input.collection,
1047                                input.label,
1048                                node_type,
1049                                input.properties.into_iter().collect(),
1050                                input.metadata.into_iter().collect(),
1051                            );
1052                        }
1053                        PreparedGraphInsert::Edge { fields, input } => {
1054                            if query.returning.is_some() {
1055                                returning_field_snaps.push(fields);
1056                            }
1057                            batch = batch.add_edge(
1058                                input.collection,
1059                                input.label,
1060                                input.from,
1061                                input.to,
1062                                input.weight.unwrap_or(1.0),
1063                                input.properties.into_iter().collect(),
1064                                input.metadata.into_iter().collect(),
1065                            );
1066                        }
1067                    }
1068                }
1069                let batch_result = batch
1070                    .execute()
1071                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1072                let (ids, entity_kind) = match query.entity_type {
1073                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1074                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1075                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1076                };
1077                for id in &ids {
1078                    self.stamp_xmin_if_in_txn(&query.table, *id);
1079                }
1080                if query.returning.is_some() {
1081                    returning_field_snaps = graph_insert_returning_snapshots(
1082                        self.inner.db.store().as_ref(),
1083                        &query.table,
1084                        &ids,
1085                    );
1086                }
1087                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1088                let store = self.inner.db.store();
1089                entity_outputs.extend(ids.iter().map(|id| {
1090                    crate::application::entity::CreateEntityOutput {
1091                        id: *id,
1092                        entity: store.get(&query.table, *id),
1093                    }
1094                }));
1095                inserted_count = ids.len() as u64;
1096            } else {
1097                for row_values in &effective_rows {
1098                    if row_values.len() != query.columns.len() {
1099                        return Err(RedDBError::Query(format!(
1100                            "INSERT column count ({}) does not match value count ({})",
1101                            query.columns.len(),
1102                            row_values.len()
1103                        )));
1104                    }
1105
1106                    match query.entity_type {
1107                        InsertEntityType::Row => {
1108                            if query.returning.is_some() {
1109                                return Err(RedDBError::Query(
1110                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1111                                    .to_string(),
1112                            ));
1113                            }
1114                            let (fields, mut metadata) =
1115                                split_insert_metadata(self, &query.columns, row_values)?;
1116                            merge_with_clauses(
1117                                &mut metadata,
1118                                query.ttl_ms,
1119                                query.expires_at_ms,
1120                                &query.with_metadata,
1121                            );
1122                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1123                        }
1124                        InsertEntityType::Node | InsertEntityType::Edge => {
1125                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1126                        }
1127                        InsertEntityType::Vector => {
1128                            let (vector_values, mut metadata) =
1129                                split_insert_metadata(self, &query.columns, row_values)?;
1130                            merge_with_clauses(
1131                                &mut metadata,
1132                                query.ttl_ms,
1133                                query.expires_at_ms,
1134                                &query.with_metadata,
1135                            );
1136                            let (columns, values) = pairwise_columns_values(&vector_values);
1137                            let dense = find_column_value_vec_f32_any(
1138                                &columns,
1139                                &values,
1140                                &["dense", "embedding"],
1141                            )?;
1142                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1143                            let content =
1144                                find_column_value_opt_string(&columns, &values, "content");
1145                            if query.returning.is_some() {
1146                                returning_field_snaps.push(vector_values.clone());
1147                            }
1148                            let input = CreateVectorInput {
1149                                collection: query.table.clone(),
1150                                dense,
1151                                content,
1152                                metadata,
1153                                link_row: None,
1154                                link_node: None,
1155                            };
1156                            entity_outputs.push(self.create_vector(input)?);
1157                        }
1158                        InsertEntityType::Document => {
1159                            let (document_values, mut metadata) =
1160                                split_insert_metadata(self, &query.columns, row_values)?;
1161                            merge_with_clauses(
1162                                &mut metadata,
1163                                query.ttl_ms,
1164                                query.expires_at_ms,
1165                                &query.with_metadata,
1166                            );
1167                            let (columns, values) = pairwise_columns_values(&document_values);
1168                            let body_str = find_column_value_string(&columns, &values, "body")?;
1169                            let body: crate::json::Value = crate::json::from_str(&body_str)
1170                                .map_err(|e| {
1171                                    RedDBError::Query(format!("invalid JSON body: {e}"))
1172                                })?;
1173                            let input = CreateDocumentInput {
1174                                collection: query.table.clone(),
1175                                body,
1176                                metadata,
1177                                node_links: Vec::new(),
1178                                vector_links: Vec::new(),
1179                            };
1180                            let output = self.create_document(input)?;
1181                            if query.returning.is_some() {
1182                                let fields = output
1183                                    .entity
1184                                    .as_ref()
1185                                    .map(entity_row_fields_snapshot)
1186                                    .filter(|fields| !fields.is_empty())
1187                                    .unwrap_or(document_values);
1188                                returning_field_snaps.push(fields);
1189                            }
1190                            entity_outputs.push(output);
1191                        }
1192                        InsertEntityType::Kv => {
1193                            let (kv_values, mut metadata) =
1194                                split_insert_metadata(self, &query.columns, row_values)?;
1195                            merge_with_clauses(
1196                                &mut metadata,
1197                                query.ttl_ms,
1198                                query.expires_at_ms,
1199                                &query.with_metadata,
1200                            );
1201                            let (columns, values) = pairwise_columns_values(&kv_values);
1202                            let key = find_column_value_string(&columns, &values, "key")?;
1203                            let value = find_column_value(&columns, &values, "value")?;
1204                            if query.returning.is_some() {
1205                                returning_field_snaps.push(kv_values.clone());
1206                            }
1207                            let input = CreateKvInput {
1208                                collection: query.table.clone(),
1209                                key,
1210                                value,
1211                                metadata,
1212                            };
1213                            entity_outputs.push(self.create_kv(input)?);
1214                        }
1215                    }
1216
1217                    inserted_count += 1;
1218                }
1219            }
1220
1221            if let Some(items) = query.returning.as_ref() {
1222                if !entity_outputs.is_empty() {
1223                    returning_result = Some(build_returning_result(
1224                        items,
1225                        &returning_field_snaps,
1226                        Some(&entity_outputs),
1227                    ));
1228                }
1229            }
1230        }
1231
1232        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1233        if let Some(ref embed_config) = query.auto_embed {
1234            let store = self.inner.db.store();
1235            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1236            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
1237            let model = embed_config.model.clone().unwrap_or_else(|| {
1238                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1239                    .ok()
1240                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1241            });
1242
1243            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1244            let manager = store
1245                .get_collection(&query.table)
1246                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1247            let entities = manager.query_all(|_| true);
1248            let recent: Vec<_> = entities
1249                .into_iter()
1250                .rev()
1251                .take(effective_rows.len())
1252                .collect();
1253
1254            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1255            let entity_combos: Vec<(usize, String)> = recent
1256                .iter()
1257                .enumerate()
1258                .filter_map(|(i, entity)| {
1259                    if let EntityData::Row(ref row) = entity.data {
1260                        if let Some(ref named) = row.named {
1261                            let texts: Vec<String> = embed_config
1262                                .fields
1263                                .iter()
1264                                .filter_map(|field| match named.get(field) {
1265                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1266                                    _ => None,
1267                                })
1268                                .collect();
1269                            if !texts.is_empty() {
1270                                return Some((i, texts.join(" ")));
1271                            }
1272                        }
1273                    }
1274                    None
1275                })
1276                .collect();
1277
1278            if !entity_combos.is_empty() {
1279                // Batch phase: single provider round-trip for all rows.
1280                let batch_texts: Vec<String> =
1281                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1282
1283                let batch_client =
1284                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1285
1286                let embeddings = match tokio::runtime::Handle::try_current() {
1287                    Ok(handle) => tokio::task::block_in_place(|| {
1288                        handle.block_on(batch_client.embed_batch(
1289                            &provider,
1290                            &model,
1291                            &api_key,
1292                            batch_texts,
1293                        ))
1294                    }),
1295                    Err(_) => {
1296                        return Err(RedDBError::Query(
1297                            "AUTO EMBED requires a Tokio runtime context".to_string(),
1298                        ));
1299                    }
1300                }
1301                .map_err(|e| RedDBError::Query(e.to_string()))?;
1302
1303                // Distribute phase: persist one vector per non-empty embedding.
1304                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1305                    if dense.is_empty() {
1306                        continue;
1307                    }
1308                    self.create_vector(CreateVectorInput {
1309                        collection: query.table.clone(),
1310                        dense,
1311                        content: Some(combined.clone()),
1312                        metadata: Vec::new(),
1313                        link_row: None,
1314                        link_node: None,
1315                    })?;
1316                }
1317            }
1318        }
1319
1320        if inserted_count > 0 {
1321            self.note_table_write(&query.table);
1322        }
1323
1324        let mut result = RuntimeQueryResult::dml_result(
1325            raw_query.to_string(),
1326            inserted_count,
1327            "insert",
1328            "runtime-dml",
1329        );
1330        if let Some(returning) = returning_result {
1331            result.result = returning;
1332        }
1333        Ok(result)
1334    }
1335
1336    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1337        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1338            return Ok(());
1339        };
1340        if !auth_store.iam_authorization_enabled() {
1341            return Ok(());
1342        }
1343        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1344            return Ok(());
1345        };
1346
1347        let tenant = crate::runtime::impl_core::current_tenant();
1348        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1349        let request = crate::auth::ColumnAccessRequest {
1350            action: "insert".to_string(),
1351            schema: None,
1352            table: query.table.clone(),
1353            columns: query.columns.clone(),
1354        };
1355        let ctx = crate::auth::policies::EvalContext {
1356            principal_tenant: tenant.clone(),
1357            current_tenant: tenant,
1358            peer_ip: None,
1359            mfa_present: false,
1360            now_ms: crate::auth::now_ms(),
1361            principal_is_admin_role: role == crate::auth::Role::Admin,
1362        };
1363
1364        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1365        let table_allowed = matches!(
1366            outcome.table_decision,
1367            crate::auth::policies::Decision::Allow { .. }
1368                | crate::auth::policies::Decision::AdminBypass
1369        );
1370        if !table_allowed {
1371            return Err(RedDBError::Query(format!(
1372                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1373                outcome.table_resource.kind, outcome.table_resource.name
1374            )));
1375        }
1376        if let Some(denied) = outcome.first_denied_column() {
1377            return Err(RedDBError::Query(format!(
1378                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1379                denied.resource.kind, denied.resource.name
1380            )));
1381        }
1382
1383        Ok(())
1384    }
1385
1386    pub(crate) fn insert_timeseries_point(
1387        &self,
1388        collection: &str,
1389        fields: Vec<(String, Value)>,
1390        mut metadata: Vec<(String, MetadataValue)>,
1391    ) -> RedDBResult<EntityId> {
1392        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1393
1394        let (columns, values) = pairwise_columns_values(&fields);
1395        validate_timeseries_insert_columns(&columns)?;
1396
1397        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1398        // an `event_name` whose schema is registered, validate the
1399        // `payload` JSON against it BEFORE any write side-effect. On
1400        // failure we return a typed error and the row is not
1401        // persisted. When no schema is registered for the event name
1402        // (or no `event_name` column is supplied at all) we fall
1403        // through to the normal write path for back-compat with
1404        // existing timeseries rows.
1405        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1406        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1407        if let Some(event_name) = event_name_opt.as_deref() {
1408            let store_for_schema = self.inner.db.store();
1409            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1410                .is_some()
1411            {
1412                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1413                super::analytics_schema_registry::validate(
1414                    store_for_schema.as_ref(),
1415                    event_name,
1416                    payload_json,
1417                )
1418                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1419            }
1420        }
1421
1422        // `metric` is required by the existing timeseries write path;
1423        // when an analytics-style row supplies `event_name` but not
1424        // `metric`, fall back to the event name so the storage path
1425        // still has a non-empty metric tag.
1426        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1427            Some(m) => m,
1428            None => event_name_opt.clone().ok_or_else(|| {
1429                RedDBError::Query(
1430                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1431                )
1432            })?,
1433        };
1434        // `value` is optional for analytics-event rows (which are
1435        // semantically counts of 1); default to 1.0 when missing so
1436        // analytics inserts don't have to fabricate a metric value.
1437        let value = match find_column_value_opt_string(&columns, &values, "value") {
1438            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1439            None => columns
1440                .iter()
1441                .position(|c| c.eq_ignore_ascii_case("value"))
1442                .and_then(|i| match &values[i] {
1443                    Value::Float(f) => Some(*f),
1444                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1445                    Value::UnsignedInteger(n) => Some(*n as f64),
1446                    _ => None,
1447                })
1448                .unwrap_or(1.0),
1449        };
1450        let timestamp_ns =
1451            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1452        let mut tags = find_timeseries_tags(&columns, &values)?;
1453        if let Some(ref name) = event_name_opt {
1454            tags.entry("event_name".to_string())
1455                .or_insert_with(|| name.clone());
1456        }
1457        if let Some(ref payload) = payload_opt {
1458            tags.entry("payload".to_string())
1459                .or_insert_with(|| payload.clone());
1460        }
1461
1462        let mut entity = UnifiedEntity::new(
1463            EntityId::new(0),
1464            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1465                series: collection.to_string(),
1466                metric: metric.clone(),
1467            })),
1468            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1469                metric,
1470                timestamp_ns,
1471                value,
1472                tags,
1473            }),
1474        );
1475        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1476        // or an autocommit xid (allocated and committed up-front so
1477        // future snapshots see the row as soon as it lands).
1478        let writer_xid = match self.current_xid() {
1479            Some(xid) => xid,
1480            None => {
1481                let mgr = self.snapshot_manager();
1482                let xid = mgr.begin();
1483                mgr.commit(xid);
1484                xid
1485            }
1486        };
1487        entity.set_xmin(writer_xid);
1488
1489        let store = self.inner.db.store();
1490        let id = store
1491            .insert_auto(collection, entity)
1492            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1493
1494        if !metadata.is_empty() {
1495            let _ = store.set_metadata(
1496                collection,
1497                id,
1498                Metadata::with_fields(metadata.into_iter().collect()),
1499            );
1500        }
1501
1502        self.cdc_emit(
1503            crate::replication::cdc::ChangeOperation::Insert,
1504            collection,
1505            id.raw(),
1506            "timeseries",
1507        );
1508
1509        Ok(id)
1510    }
1511
1512    /// Execute UPDATE table SET col=val, ... WHERE filter
1513    ///
1514    /// Scans the target collection, evaluates the WHERE filter against each
1515    /// record, and patches every matching entity.
1516    pub fn execute_update(
1517        &self,
1518        raw_query: &str,
1519        query: &UpdateQuery,
1520    ) -> RedDBResult<RuntimeQueryResult> {
1521        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1522        // Issue #523 — blockchain collections are immutable. Reject before
1523        // RLS / RETURNING work so the operator sees a clean 409-mapped
1524        // error instead of a partially-applied mutation surface.
1525        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1526            return Err(RedDBError::InvalidOperation(format!(
1527                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1528                query.table
1529            )));
1530        }
1531        // CollectionContract gate (#50): runs the APPEND ONLY guard
1532        // (and any future contract bits) before RLS / RETURNING work
1533        // so the operator's immutability declaration is honoured
1534        // uniformly and the error message points at the DDL rather
1535        // than at a downstream symptom.
1536        crate::runtime::collection_contract::CollectionContractGate::check(
1537            self,
1538            &query.table,
1539            crate::runtime::collection_contract::MutationKind::Update,
1540        )?;
1541        ensure_update_target_contract(self, &query.table, query.target)?;
1542        ensure_graph_identity_update_target_allowed(query)?;
1543
1544        // Apply RLS augmentation first so every downstream path — plain
1545        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1546        // same policy-filtered target set. This prevents RETURNING
1547        // from ever exposing rows the UPDATE policy would have
1548        // denied.
1549        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1550        let augmented_query: UpdateQuery;
1551        let effective_query: &UpdateQuery = if rls_gated {
1552            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1553                self,
1554                &query.table,
1555                crate::storage::query::ast::PolicyAction::Update,
1556            );
1557            let Some(policy) = rls_filter else {
1558                // No admitting policy: zero rows affected, empty
1559                // RETURNING (never leak rows the caller can't touch).
1560                let mut response = RuntimeQueryResult::dml_result(
1561                    raw_query.to_string(),
1562                    0,
1563                    "update",
1564                    "runtime-dml-rls",
1565                );
1566                if let Some(items) = query.returning.clone() {
1567                    response.result = build_returning_result(&items, &[], None);
1568                }
1569                return Ok(response);
1570            };
1571            let mut augmented = query.clone();
1572            augmented.filter = Some(match augmented.filter.take() {
1573                Some(existing) => {
1574                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1575                }
1576                None => policy,
1577            });
1578            augmented_query = augmented;
1579            &augmented_query
1580        } else {
1581            query
1582        };
1583
1584        // RETURNING wraps the inner executor and uses the touched-id
1585        // list the inner reports so the post-image reflects exactly
1586        // the rows the UPDATE actually mutated (not whatever a
1587        // separate SELECT might have observed).
1588        if let Some(items) = effective_query.returning.clone() {
1589            let mut inner_query = effective_query.clone();
1590            inner_query.returning = None;
1591            let (mut response, touched_ids) =
1592                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1593
1594            let snapshots = if matches!(
1595                effective_query.target,
1596                UpdateTarget::Nodes | UpdateTarget::Edges
1597            ) {
1598                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1599            } else {
1600                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1601                    .row_snapshots(&touched_ids)
1602            };
1603
1604            response.result = build_returning_result(&items, &snapshots, None);
1605            response.engine = "runtime-dml-returning";
1606            return Ok(response);
1607        }
1608
1609        self.execute_update_inner(raw_query, effective_query)
1610    }
1611
1612    /// Back-compat shim: the older entry point ignored touched ids.
1613    fn execute_update_inner(
1614        &self,
1615        raw_query: &str,
1616        query: &UpdateQuery,
1617    ) -> RedDBResult<RuntimeQueryResult> {
1618        self.execute_update_inner_tracked(raw_query, query)
1619            .map(|(res, _)| res)
1620    }
1621
1622    fn execute_update_inner_tracked(
1623        &self,
1624        raw_query: &str,
1625        query: &UpdateQuery,
1626    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1627        let store = self.inner.db.store();
1628        let effective_filter = effective_update_filter(query);
1629        let compiled_plan = self.compile_update_plan(query)?;
1630        let mut touched_ids: Vec<EntityId> = Vec::new();
1631        let limit_cap = query.limit.map(|l| l as usize);
1632        let manager = store
1633            .get_collection(&query.table)
1634            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1635        let scan_limit = if query.order_by.is_empty() {
1636            limit_cap
1637        } else {
1638            None
1639        };
1640        let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1641            self,
1642            &query.table,
1643            effective_filter.as_ref(),
1644            scan_limit,
1645            query.target,
1646        )
1647        .find_target_ids()?;
1648        let ids_to_update = if query.order_by.is_empty() {
1649            ids_to_update
1650        } else {
1651            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1652        };
1653
1654        if update_needs_rmw_lock(query) {
1655            return self.execute_update_inner_tracked_locked(
1656                raw_query,
1657                query,
1658                &compiled_plan,
1659                &ids_to_update,
1660                effective_filter.as_ref(),
1661            );
1662        }
1663
1664        let mut affected: u64 = 0;
1665        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1666            let mut applied_chunk = Vec::with_capacity(chunk.len());
1667            for entity in manager.get_many(chunk).into_iter().flatten() {
1668                let assignments =
1669                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1670                let applied = self.apply_materialized_update_for_entity(
1671                    query.table.clone(),
1672                    entity,
1673                    &compiled_plan,
1674                    assignments,
1675                )?;
1676                touched_ids.push(applied.id);
1677                applied_chunk.push(applied);
1678            }
1679            self.persist_update_chunk(&applied_chunk)?;
1680            affected += applied_chunk.len() as u64;
1681            let lsns = self.flush_update_chunk(&applied_chunk)?;
1682            if !query.suppress_events {
1683                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1684            }
1685        }
1686
1687        if affected > 0 {
1688            self.note_table_write(&query.table);
1689        }
1690
1691        Ok((
1692            RuntimeQueryResult::dml_result(
1693                raw_query.to_string(),
1694                affected,
1695                "update",
1696                "runtime-dml",
1697            ),
1698            touched_ids,
1699        ))
1700    }
1701
1702    fn execute_update_inner_tracked_locked(
1703        &self,
1704        raw_query: &str,
1705        query: &UpdateQuery,
1706        compiled_plan: &CompiledUpdatePlan,
1707        ids_to_update: &[EntityId],
1708        effective_filter: Option<&Filter>,
1709    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1710        let store = self.inner.db.store();
1711        let mut touched_ids = Vec::new();
1712        let mut lock_entries = Vec::new();
1713
1714        for id in ids_to_update {
1715            let Some(candidate) = store.get(&query.table, *id) else {
1716                continue;
1717            };
1718            let logical_id = candidate.logical_id();
1719            let lock_key = format!("row:{}", logical_id.raw());
1720            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1721            lock_entries.push((lock_key, logical_id, rmw_lock));
1722        }
1723
1724        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1725        lock_entries.dedup_by(|left, right| left.0 == right.0);
1726        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1727
1728        let mut applied_chunk = Vec::new();
1729        for (_, logical_id, _) in &lock_entries {
1730            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1731            else {
1732                continue;
1733            };
1734            if let Some(filter) = effective_filter {
1735                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1736                    Some(self.inner.db.as_ref()),
1737                    &entity,
1738                    filter,
1739                    &query.table,
1740                    &query.table,
1741                ) {
1742                    continue;
1743                }
1744            }
1745
1746            let assignments =
1747                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1748            let applied = self.apply_materialized_update_for_entity(
1749                query.table.clone(),
1750                entity,
1751                compiled_plan,
1752                assignments,
1753            )?;
1754            touched_ids.push(applied.id);
1755            applied_chunk.push(applied);
1756        }
1757
1758        let affected = applied_chunk.len() as u64;
1759        if !applied_chunk.is_empty() {
1760            self.persist_update_chunk(&applied_chunk)?;
1761            let lsns = self.flush_update_chunk(&applied_chunk)?;
1762            if !query.suppress_events {
1763                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1764            }
1765        }
1766
1767        if affected > 0 {
1768            self.note_table_write(&query.table);
1769        }
1770
1771        Ok((
1772            RuntimeQueryResult::dml_result(
1773                raw_query.to_string(),
1774                affected,
1775                "update",
1776                "runtime-dml",
1777            ),
1778            touched_ids,
1779        ))
1780    }
1781
1782    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1783        let mut static_field_assignments = Vec::new();
1784        let mut static_metadata_assignments = Vec::new();
1785        let mut dynamic_assignments = Vec::new();
1786        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1787        let mut row_modified_columns = Vec::new();
1788
1789        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1790            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1791            let metadata_key = resolve_sql_ttl_metadata_key(column);
1792            if compound_op.is_some() && metadata_key.is_some() {
1793                return Err(RedDBError::Query(format!(
1794                    "compound assignment is only supported for row fields: {column}"
1795                )));
1796            }
1797            if compound_op.is_none() {
1798                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1799                    if let Some(metadata_key) = metadata_key {
1800                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1801                        let (canonical_key, canonical_value) =
1802                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1803                        static_metadata_assignments
1804                            .push((canonical_key.to_string(), canonical_value));
1805                    } else {
1806                        let value = self.resolve_crypto_sentinel(value)?;
1807                        static_field_assignments.push((
1808                            column.clone(),
1809                            normalize_row_update_assignment_with_plan(
1810                                &query.table,
1811                                column,
1812                                value,
1813                                row_contract_plan.as_ref(),
1814                            )?,
1815                        ));
1816                        row_modified_columns.push(column.clone());
1817                    }
1818                    continue;
1819                }
1820            }
1821
1822            dynamic_assignments.push(CompiledUpdateAssignment {
1823                column: column.clone(),
1824                expr: expr.clone(),
1825                compound_op,
1826                metadata_key,
1827                row_rule: if metadata_key.is_none() {
1828                    if let Some(plan) = row_contract_plan.as_ref() {
1829                        if plan.timestamps_enabled
1830                            && (column == "created_at" || column == "updated_at")
1831                        {
1832                            return Err(RedDBError::Query(format!(
1833                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1834                                query.table, column
1835                            )));
1836                        }
1837                        if let Some(rule) = plan.declared_rules.get(column) {
1838                            Some(rule.clone())
1839                        } else if plan.strict_schema {
1840                            return Err(RedDBError::Query(format!(
1841                                "collection '{}' is strict and does not allow undeclared fields: {}",
1842                                query.table, column
1843                            )));
1844                        } else {
1845                            None
1846                        }
1847                    } else {
1848                        None
1849                    }
1850                } else {
1851                    None
1852                },
1853            });
1854            if metadata_key.is_none() {
1855                row_modified_columns.push(column.clone());
1856            }
1857        }
1858
1859        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1860        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1861            row_modified_columns.iter().any(|column| {
1862                plan.unique_columns
1863                    .keys()
1864                    .any(|unique| unique.eq_ignore_ascii_case(column))
1865            })
1866        });
1867
1868        if let Some(ttl_ms) = query.ttl_ms {
1869            static_metadata_assignments
1870                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1871        }
1872        if let Some(expires_at_ms) = query.expires_at_ms {
1873            static_metadata_assignments.push((
1874                "_expires_at".to_string(),
1875                metadata_u64_to_value(expires_at_ms),
1876            ));
1877        }
1878        for (key, val) in &query.with_metadata {
1879            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1880        }
1881
1882        Ok(CompiledUpdatePlan {
1883            static_field_assignments,
1884            static_metadata_assignments,
1885            dynamic_assignments,
1886            row_contract_plan,
1887            row_modified_columns,
1888            row_touches_unique_columns,
1889        })
1890    }
1891
1892    fn materialize_update_assignments_for_entity(
1893        &self,
1894        query: &UpdateQuery,
1895        entity: &UnifiedEntity,
1896        compiled_plan: &CompiledUpdatePlan,
1897    ) -> RedDBResult<MaterializedUpdateAssignments> {
1898        let mut assignments = MaterializedUpdateAssignments::default();
1899        let mut record: Option<UnifiedRecord> = None;
1900
1901        for assignment in &compiled_plan.dynamic_assignments {
1902            if assignment.compound_op.is_some()
1903                && !matches!(
1904                    entity.data,
1905                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1906                )
1907            {
1908                return Err(RedDBError::Query(format!(
1909                    "compound assignment is only supported for row or graph UPDATE column '{}'",
1910                    assignment.column
1911                )));
1912            }
1913            if record.is_none() {
1914                record = runtime_any_record_from_entity_ref(entity);
1915            }
1916            let Some(record) = record.as_ref() else {
1917                return Err(RedDBError::Query(format!(
1918                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1919                    entity.id.raw(),
1920                    query.table
1921                )));
1922            };
1923            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1924                Some(self.inner.db.as_ref()),
1925                &assignment.expr,
1926                record,
1927                Some(query.table.as_str()),
1928                Some(query.table.as_str()),
1929            )
1930            .ok_or_else(|| {
1931                RedDBError::Query(format!(
1932                    "failed to evaluate UPDATE expression for column '{}'",
1933                    assignment.column
1934                ))
1935            })?;
1936            let value = if let Some(op) = assignment.compound_op {
1937                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1938            } else {
1939                rhs
1940            };
1941
1942            if let Some(metadata_key) = assignment.metadata_key {
1943                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1944                let (canonical_key, canonical_value) =
1945                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1946                assignments
1947                    .dynamic_metadata_assignments
1948                    .push((canonical_key.to_string(), canonical_value));
1949            } else {
1950                assignments.dynamic_field_assignments.push((
1951                    assignment.column.clone(),
1952                    normalize_row_update_value_for_rule(
1953                        &query.table,
1954                        self.resolve_crypto_sentinel(value)?,
1955                        assignment.row_rule.as_ref(),
1956                    )?,
1957                ));
1958            }
1959        }
1960
1961        Ok(assignments)
1962    }
1963
1964    fn apply_materialized_update_for_entity(
1965        &self,
1966        collection: String,
1967        entity: UnifiedEntity,
1968        compiled_plan: &CompiledUpdatePlan,
1969        assignments: MaterializedUpdateAssignments,
1970    ) -> RedDBResult<AppliedEntityMutation> {
1971        if matches!(entity.data, EntityData::Row(_)) {
1972            return self.apply_loaded_sql_update_row_core(
1973                collection,
1974                entity,
1975                &compiled_plan.static_field_assignments,
1976                assignments.dynamic_field_assignments,
1977                &compiled_plan.static_metadata_assignments,
1978                assignments.dynamic_metadata_assignments,
1979                compiled_plan.row_contract_plan.as_ref(),
1980                &compiled_plan.row_modified_columns,
1981                compiled_plan.row_touches_unique_columns,
1982            );
1983        }
1984
1985        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
1986
1987        let operations = build_patch_operations_from_materialized_assignments(
1988            &entity,
1989            compiled_plan,
1990            assignments,
1991        );
1992        self.apply_loaded_patch_entity_core(
1993            collection,
1994            entity,
1995            crate::json::Value::Null,
1996            operations,
1997        )
1998    }
1999
2000    /// Execute DELETE FROM table WHERE filter
2001    pub fn execute_delete(
2002        &self,
2003        raw_query: &str,
2004        query: &DeleteQuery,
2005    ) -> RedDBResult<RuntimeQueryResult> {
2006        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2007        // Issue #523 — blockchain collections are immutable; see
2008        // execute_update for the same gate.
2009        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2010            return Err(RedDBError::InvalidOperation(format!(
2011                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2012                query.table
2013            )));
2014        }
2015        // CollectionContract gate (#50) — see execute_update for
2016        // rationale. The gate handles APPEND ONLY rejection and is
2017        // the single point where future contract bits land.
2018        crate::runtime::collection_contract::CollectionContractGate::check(
2019            self,
2020            &query.table,
2021            crate::runtime::collection_contract::MutationKind::Delete,
2022        )?;
2023
2024        // RETURNING on DELETE: capture the pre-image via an internal
2025        // SELECT that reuses the same WHERE, then run the delete with
2026        // the RETURNING clause stripped, then project the captured
2027        // rows through the requested items. The extra SELECT is a
2028        // pragmatic MVP — a future pass can fuse the scan with the
2029        // delete to avoid the second pass over the heap.
2030        if let Some(items) = query.returning.clone() {
2031            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2032                RedDBError::Query(
2033                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2034                )
2035            })?;
2036            let captured = self.execute_query(&select_sql)?;
2037
2038            let mut inner_query = query.clone();
2039            inner_query.returning = None;
2040            let _ = self.execute_delete(raw_query, &inner_query)?;
2041
2042            let snapshots: Vec<Vec<(String, Value)>> = captured
2043                .result
2044                .records
2045                .iter()
2046                .map(|rec| {
2047                    rec.iter_fields()
2048                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2049                        .collect()
2050                })
2051                .collect();
2052            let affected = snapshots.len() as u64;
2053            let result = build_returning_result(&items, &snapshots, None);
2054
2055            let mut response = RuntimeQueryResult::dml_result(
2056                raw_query.to_string(),
2057                affected,
2058                "delete",
2059                "runtime-dml-returning",
2060            );
2061            response.result = result;
2062            return Ok(response);
2063        }
2064        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2065        //
2066        // When the table has RLS enabled, gate the DELETE by the
2067        // per-role policy set: mutations only touch rows that *every*
2068        // matching `FOR DELETE` policy would accept. No policies =>
2069        // zero rows affected (PG restrictive-default).
2070        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2071            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2072                self,
2073                &query.table,
2074                crate::storage::query::ast::PolicyAction::Delete,
2075            );
2076            let Some(policy) = rls_filter else {
2077                return Ok(RuntimeQueryResult::dml_result(
2078                    raw_query.to_string(),
2079                    0,
2080                    "delete",
2081                    "runtime-dml-rls",
2082                ));
2083            };
2084            // Fold the policy predicate into the user's WHERE before
2085            // dispatching — the remainder of this function reads the
2086            // filter from `query` via `effective_delete_filter`, which
2087            // respects the updated value.
2088            let mut augmented = query.clone();
2089            augmented.filter = Some(match augmented.filter.take() {
2090                Some(existing) => {
2091                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2092                }
2093                None => policy,
2094            });
2095            return self.execute_delete_inner(raw_query, &augmented);
2096        }
2097        self.execute_delete_inner(raw_query, query)
2098    }
2099
2100    fn execute_delete_inner(
2101        &self,
2102        raw_query: &str,
2103        query: &DeleteQuery,
2104    ) -> RedDBResult<RuntimeQueryResult> {
2105        let effective_filter = effective_delete_filter(query);
2106
2107        // Find the rows that match the WHERE clause. The "find target
2108        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2109        // the same scan strategy.
2110        let scan = super::dml_target_scan::DmlTargetScan::new(
2111            self,
2112            &query.table,
2113            effective_filter.as_ref(),
2114            None,
2115        );
2116        let ids_to_delete = scan.find_target_ids()?;
2117
2118        // For event-enabled collections, snapshot the pre-delete state
2119        // before rows are physically removed.
2120        let needs_delete_events =
2121            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2122        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2123            scan.row_json_pre_images(&ids_to_delete)
2124        } else {
2125            HashMap::new()
2126        };
2127
2128        let mut affected: u64 = 0;
2129        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2130            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2131            affected += count;
2132            if needs_delete_events && !lsns.is_empty() {
2133                // lsns.len() == actually-deleted entities; align with chunk ids.
2134                // `delete_batch` may skip missing entities, so we correlate by
2135                // the number returned (they're emitted in chunk order).
2136                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2137                self.emit_delete_events_for_collection(
2138                    &query.table,
2139                    deleted_chunk,
2140                    &lsns,
2141                    &pre_images,
2142                )?;
2143            }
2144        }
2145        pre_images.clear();
2146
2147        if affected > 0 {
2148            self.note_table_write(&query.table);
2149        }
2150
2151        Ok(RuntimeQueryResult::dml_result(
2152            raw_query.to_string(),
2153            affected,
2154            "delete",
2155            "runtime-dml",
2156        ))
2157    }
2158}
2159
2160/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2161/// columns regardless of whether any row matches the WHERE clause. The
2162/// per-entity guard below covers only the matched-rows case, but ADR 0019
2163/// declares these columns immutable on the surface itself, so a zero-row
2164/// UPDATE should still surface the same error to operators and SDKs.
2165fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2166    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2167        return Ok(());
2168    }
2169    for (column, _) in &query.assignment_exprs {
2170        if is_immutable_graph_identity_field(column) {
2171            return Err(RedDBError::Query(format!(
2172                "immutable graph field '{column}' cannot be updated"
2173            )));
2174        }
2175    }
2176    Ok(())
2177}
2178
2179fn ensure_graph_identity_update_allowed(
2180    entity: &UnifiedEntity,
2181    compiled_plan: &CompiledUpdatePlan,
2182    assignments: &MaterializedUpdateAssignments,
2183) -> RedDBResult<()> {
2184    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2185        return Ok(());
2186    }
2187
2188    for (column, _) in compiled_plan
2189        .static_field_assignments
2190        .iter()
2191        .chain(assignments.dynamic_field_assignments.iter())
2192    {
2193        if is_immutable_graph_identity_field(column) {
2194            return Err(RedDBError::Query(format!(
2195                "immutable graph field '{column}' cannot be updated"
2196            )));
2197        }
2198    }
2199
2200    Ok(())
2201}
2202
2203fn is_immutable_graph_identity_field(column: &str) -> bool {
2204    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2205        .iter()
2206        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2207}
2208
2209fn build_patch_operations_from_materialized_assignments(
2210    entity: &UnifiedEntity,
2211    compiled_plan: &CompiledUpdatePlan,
2212    assignments: MaterializedUpdateAssignments,
2213) -> Vec<PatchEntityOperation> {
2214    let mut operations = Vec::with_capacity(
2215        compiled_plan.static_field_assignments.len()
2216            + compiled_plan.static_metadata_assignments.len()
2217            + assignments.dynamic_field_assignments.len()
2218            + assignments.dynamic_metadata_assignments.len(),
2219    );
2220
2221    for (column, value) in &compiled_plan.static_field_assignments {
2222        operations.push(PatchEntityOperation {
2223            op: PatchEntityOperationType::Set,
2224            path: update_patch_path_for_entity(entity, column),
2225            value: Some(storage_value_to_json(value)),
2226        });
2227    }
2228
2229    for (column, value) in assignments.dynamic_field_assignments {
2230        operations.push(PatchEntityOperation {
2231            op: PatchEntityOperationType::Set,
2232            path: update_patch_path_for_entity(entity, &column),
2233            value: Some(storage_value_to_json(&value)),
2234        });
2235    }
2236
2237    for (key, value) in &compiled_plan.static_metadata_assignments {
2238        operations.push(PatchEntityOperation {
2239            op: PatchEntityOperationType::Set,
2240            path: vec!["metadata".to_string(), key.clone()],
2241            value: Some(metadata_value_to_json(value)),
2242        });
2243    }
2244
2245    for (key, value) in assignments.dynamic_metadata_assignments {
2246        operations.push(PatchEntityOperation {
2247            op: PatchEntityOperationType::Set,
2248            path: vec!["metadata".to_string(), key],
2249            value: Some(metadata_value_to_json(&value)),
2250        });
2251    }
2252
2253    operations
2254}
2255
2256fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2257    if matches!(
2258        (&entity.kind, &entity.data),
2259        (
2260            crate::storage::EntityKind::GraphNode(_),
2261            EntityData::Node(_)
2262        )
2263    ) && column.eq_ignore_ascii_case("node_type")
2264    {
2265        return vec!["node_type".to_string()];
2266    }
2267    if matches!(
2268        (&entity.kind, &entity.data),
2269        (
2270            crate::storage::EntityKind::GraphEdge(_),
2271            EntityData::Edge(_)
2272        )
2273    ) && column.eq_ignore_ascii_case("weight")
2274    {
2275        return vec!["weight".to_string()];
2276    }
2277    vec!["fields".to_string(), column.to_string()]
2278}
2279
2280/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2281/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2282/// capture the pre-image before actually removing the rows. Returns
2283/// `None` when the input does not start with `DELETE`.
2284///
2285/// Case-insensitive on the keywords. Preserves everything between
2286/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2287/// LIMIT survive untouched. The RETURNING tail — if present — is
2288/// truncated at the first top-level `RETURNING` token.
2289fn delete_to_select_sql(sql: &str) -> Option<String> {
2290    let trimmed = sql.trim_start();
2291    let lowered = trimmed.to_ascii_lowercase();
2292    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2293        return None;
2294    }
2295    // Find `FROM` after DELETE.
2296    let from_idx = lowered.find(" from ")?;
2297    let after_from = &trimmed[from_idx + " from ".len()..];
2298    let after_from_lc = &lowered[from_idx + " from ".len()..];
2299
2300    // Cut off the RETURNING tail (a naive search — the RETURNING
2301    // clause only appears once per statement at top level in our
2302    // grammar). Matches whitespace-bounded tokens to avoid clipping
2303    // `RETURNING` inside a string literal.
2304    let mut body = after_from.to_string();
2305    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2306        body.truncate(pos);
2307    }
2308    Some(format!("SELECT * FROM {}", body.trim_end()))
2309}
2310
2311/// Find the byte offset of a whitespace-bounded keyword in a
2312/// lowercased haystack, skipping matches inside single-quoted
2313/// string literals. Naive — no escape handling — but enough for
2314/// the shapes the DML parser emits.
2315fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2316    let bytes = haystack.as_bytes();
2317    let nlen = needle.len();
2318    let mut i = 0usize;
2319    let mut in_string = false;
2320    while i < bytes.len() {
2321        let c = bytes[i];
2322        if c == b'\'' {
2323            in_string = !in_string;
2324            i += 1;
2325            continue;
2326        }
2327        if !in_string
2328            && i + nlen <= bytes.len()
2329            && &bytes[i..i + nlen] == needle.as_bytes()
2330            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2331            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2332        {
2333            return Some(i);
2334        }
2335        i += 1;
2336    }
2337    None
2338}
2339
2340/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2341/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2342/// for one affected row; `outputs`, when provided, supplies the engine-
2343/// assigned entity id for the same row (INSERT path). Projection honours
2344/// the RETURNING items: `*` expands to every snapshot column plus
2345/// the public row envelope when available.
2346fn build_returning_result(
2347    items: &[ReturningItem],
2348    snapshots: &[Vec<(String, Value)>],
2349    outputs: Option<&[CreateEntityOutput]>,
2350) -> UnifiedResult {
2351    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2352    let public_item_outputs = outputs.is_some_and(|outs| {
2353        outs.first()
2354            .and_then(|out| out.entity.as_ref())
2355            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2356    });
2357
2358    let mut columns: Vec<String> = if project_all {
2359        let mut cols: Vec<String> = Vec::new();
2360        if public_item_outputs {
2361            cols.extend(
2362                [
2363                    "rid",
2364                    "collection",
2365                    "kind",
2366                    "tenant",
2367                    "created_at",
2368                    "updated_at",
2369                ]
2370                .into_iter()
2371                .map(str::to_string),
2372            );
2373        } else if outputs.is_some() {
2374            cols.push("red_entity_id".to_string());
2375        }
2376        if let Some(first) = snapshots.first() {
2377            for (name, _) in first {
2378                cols.push(name.clone());
2379            }
2380        }
2381        cols
2382    } else {
2383        items
2384            .iter()
2385            .filter_map(|it| match it {
2386                ReturningItem::Column(c) => Some(c.clone()),
2387                ReturningItem::All => None,
2388            })
2389            .collect()
2390    };
2391    // Guarantee unique order-preserving column list.
2392    {
2393        let mut seen = std::collections::HashSet::new();
2394        columns.retain(|c| seen.insert(c.clone()));
2395    }
2396
2397    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2398    for (idx, snap) in snapshots.iter().enumerate() {
2399        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2400        if let Some(outs) = outputs {
2401            if let Some(out) = outs.get(idx) {
2402                if let Some(entity) = out.entity.as_ref() {
2403                    if let Some(kind) = public_returning_item_kind(entity) {
2404                        values.insert(
2405                            Arc::clone(&sys_key_rid()),
2406                            Value::UnsignedInteger(out.id.raw()),
2407                        );
2408                        values.insert(
2409                            Arc::clone(&sys_key_collection()),
2410                            Value::text(entity.kind.collection().to_string()),
2411                        );
2412                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2413                        values.insert(
2414                            Arc::clone(&sys_key_created_at()),
2415                            Value::UnsignedInteger(entity.created_at),
2416                        );
2417                        values.insert(
2418                            Arc::clone(&sys_key_updated_at()),
2419                            Value::UnsignedInteger(entity.updated_at),
2420                        );
2421                    } else {
2422                        values.insert(
2423                            Arc::clone(&sys_key_red_entity_id()),
2424                            Value::Integer(out.id.raw() as i64),
2425                        );
2426                    }
2427                } else {
2428                    values.insert(
2429                        Arc::clone(&sys_key_red_entity_id()),
2430                        Value::Integer(out.id.raw() as i64),
2431                    );
2432                }
2433            }
2434        }
2435        for (name, val) in snap {
2436            values.insert(Arc::from(name.as_str()), val.clone());
2437        }
2438        if !values.contains_key("tenant") {
2439            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2440            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2441        }
2442        let mut rec = UnifiedRecord::default();
2443        // Only keep projected columns on the record.
2444        for col in &columns {
2445            if let Some(v) = values.get(col.as_str()) {
2446                rec.set_arc(Arc::from(col.as_str()), v.clone());
2447            }
2448        }
2449        records.push(rec);
2450    }
2451
2452    UnifiedResult {
2453        columns,
2454        records,
2455        stats: Default::default(),
2456        pre_serialized_json: None,
2457    }
2458}
2459
2460fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2461    match (&entity.kind, &entity.data) {
2462        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2463            Some("node")
2464        }
2465        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2466            Some("edge")
2467        }
2468        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2469        _ => None,
2470    }
2471}
2472
2473fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2474    let Some(row) = entity.data.as_row() else {
2475        return "row";
2476    };
2477
2478    let is_kv = row.named.as_ref().is_some_and(|named| {
2479        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2480            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2481    });
2482    if is_kv {
2483        return "kv";
2484    }
2485
2486    let is_document = row
2487        .named
2488        .as_ref()
2489        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2490        || row.columns.iter().any(runtime_returning_documentish_value);
2491    if is_document {
2492        "document"
2493    } else {
2494        "row"
2495    }
2496}
2497
2498fn runtime_returning_documentish_value(value: &Value) -> bool {
2499    matches!(value, Value::Json(_) | Value::Blob(_))
2500}
2501
2502fn row_insert_returning_snapshots(
2503    outputs: &[CreateEntityOutput],
2504    fallback: Vec<Vec<(String, Value)>>,
2505) -> Vec<Vec<(String, Value)>> {
2506    outputs
2507        .iter()
2508        .enumerate()
2509        .map(|(idx, out)| {
2510            out.entity
2511                .as_ref()
2512                .map(entity_row_fields_snapshot)
2513                .filter(|snap| !snap.is_empty())
2514                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2515        })
2516        .collect()
2517}
2518
2519fn graph_insert_returning_snapshots(
2520    store: &crate::storage::unified::UnifiedStore,
2521    collection: &str,
2522    ids: &[EntityId],
2523) -> Vec<Vec<(String, Value)>> {
2524    let Some(manager) = store.get_collection(collection) else {
2525        return Vec::new();
2526    };
2527
2528    ids.iter()
2529        .filter_map(|id| manager.get(*id))
2530        .filter_map(|entity| {
2531            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2532            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2533            Some(record)
2534        })
2535        .map(|record| {
2536            record
2537                .iter_fields()
2538                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2539                .collect()
2540        })
2541        .collect()
2542}
2543
2544fn graph_update_returning_snapshots(
2545    runtime: &RedDBRuntime,
2546    collection: &str,
2547    ids: &[EntityId],
2548) -> Vec<Vec<(String, Value)>> {
2549    let store = runtime.db().store();
2550    let Some(manager) = store.get_collection(collection) else {
2551        return Vec::new();
2552    };
2553
2554    manager
2555        .get_many(ids)
2556        .into_iter()
2557        .flatten()
2558        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2559        .map(|record| {
2560            record
2561                .iter_fields()
2562                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2563                .collect()
2564        })
2565        .collect()
2566}
2567
2568fn ensure_update_target_contract(
2569    runtime: &RedDBRuntime,
2570    collection: &str,
2571    target: UpdateTarget,
2572) -> RedDBResult<()> {
2573    let Some(contract) = runtime.db().collection_contract(collection) else {
2574        return Ok(());
2575    };
2576    if update_target_contract_is_advisory(&contract)
2577        || update_target_allows_model(contract.declared_model, update_target_model(target))
2578    {
2579        return Ok(());
2580    }
2581    Err(RedDBError::InvalidOperation(format!(
2582        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2583        collection,
2584        update_model_name(contract.declared_model),
2585        update_model_name(update_target_model(target))
2586    )))
2587}
2588
2589fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2590    matches!(
2591        (&contract.origin, &contract.schema_mode),
2592        (
2593            crate::physical::ContractOrigin::Implicit,
2594            crate::catalog::SchemaMode::Dynamic,
2595        )
2596    )
2597}
2598
2599fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2600    match target {
2601        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2602        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2603        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2604        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2605    }
2606}
2607
2608fn update_target_allows_model(
2609    declared_model: crate::catalog::CollectionModel,
2610    requested_model: crate::catalog::CollectionModel,
2611) -> bool {
2612    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2613}
2614
2615fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2616    match model {
2617        crate::catalog::CollectionModel::Table => "table",
2618        crate::catalog::CollectionModel::Document => "document",
2619        crate::catalog::CollectionModel::Graph => "graph",
2620        crate::catalog::CollectionModel::Vector => "vector",
2621        crate::catalog::CollectionModel::Hll => "hll",
2622        crate::catalog::CollectionModel::Sketch => "sketch",
2623        crate::catalog::CollectionModel::Filter => "filter",
2624        crate::catalog::CollectionModel::Kv => "kv",
2625        crate::catalog::CollectionModel::Config => "config",
2626        crate::catalog::CollectionModel::Vault => "vault",
2627        crate::catalog::CollectionModel::Mixed => "mixed",
2628        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2629        crate::catalog::CollectionModel::Queue => "queue",
2630        crate::catalog::CollectionModel::Metrics => "metrics",
2631    }
2632}
2633
2634fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2635    let db = runtime.db();
2636    if let Some(contract) = db.collection_contract(collection) {
2637        let advisory_implicit_dynamic = matches!(
2638            (&contract.origin, &contract.schema_mode),
2639            (
2640                crate::physical::ContractOrigin::Implicit,
2641                crate::catalog::SchemaMode::Dynamic,
2642            )
2643        );
2644        if advisory_implicit_dynamic
2645            || matches!(
2646                contract.declared_model,
2647                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2648            )
2649        {
2650            return Ok(());
2651        }
2652        return Err(RedDBError::InvalidOperation(format!(
2653            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2654            collection, contract.declared_model
2655        )));
2656    }
2657
2658    let now = std::time::SystemTime::now()
2659        .duration_since(std::time::UNIX_EPOCH)
2660        .unwrap_or_default()
2661        .as_millis();
2662    db.save_collection_contract(crate::physical::CollectionContract {
2663        name: collection.to_string(),
2664        declared_model: crate::catalog::CollectionModel::Graph,
2665        schema_mode: crate::catalog::SchemaMode::Dynamic,
2666        origin: crate::physical::ContractOrigin::Implicit,
2667        version: 1,
2668        created_at_unix_ms: now,
2669        updated_at_unix_ms: now,
2670        default_ttl_ms: db.collection_default_ttl_ms(collection),
2671        vector_dimension: None,
2672        vector_metric: None,
2673        context_index_fields: Vec::new(),
2674        declared_columns: Vec::new(),
2675        table_def: None,
2676        timestamps_enabled: false,
2677        context_index_enabled: false,
2678        metrics_raw_retention_ms: None,
2679        metrics_rollup_policies: Vec::new(),
2680        metrics_tenant_identity: None,
2681        metrics_namespace: None,
2682        append_only: false,
2683        subscriptions: Vec::new(),
2684        session_key: None,
2685        session_gap_ms: None,
2686        retention_duration_ms: None,
2687    })
2688    .map(|_| ())
2689    .map_err(|err| RedDBError::Internal(err.to_string()))
2690}
2691
2692fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2693    query
2694        .assignment_exprs
2695        .iter()
2696        .enumerate()
2697        .any(|(idx, (column, expr))| {
2698            query
2699                .compound_assignment_ops
2700                .get(idx)
2701                .is_some_and(|op| op.is_some())
2702                || expr_references_update_column(expr, &query.table, column)
2703        })
2704}
2705
2706fn evaluate_compound_update_assignment(
2707    column: &str,
2708    record: &UnifiedRecord,
2709    op: BinOp,
2710    rhs: Value,
2711) -> RedDBResult<Value> {
2712    let lhs = record.get(column).ok_or_else(|| {
2713        RedDBError::Query(format!(
2714            "compound assignment requires existing numeric field '{column}'"
2715        ))
2716    })?;
2717    if matches!(lhs, Value::Null) {
2718        return Err(RedDBError::Query(format!(
2719            "compound assignment requires non-null numeric field '{column}'"
2720        )));
2721    }
2722    apply_compound_numeric_op(column, op, lhs, &rhs)
2723}
2724
2725fn apply_compound_numeric_op(
2726    column: &str,
2727    op: BinOp,
2728    lhs: &Value,
2729    rhs: &Value,
2730) -> RedDBResult<Value> {
2731    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2732        return Err(RedDBError::Query(format!(
2733            "compound assignment requires numeric field '{column}'"
2734        )));
2735    };
2736    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2737        return Err(RedDBError::Query(format!(
2738            "compound assignment requires numeric right-hand value for field '{column}'"
2739        )));
2740    };
2741
2742    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2743        let a = lhs_number.as_f64();
2744        let b = rhs_number.as_f64();
2745        let out = match op {
2746            BinOp::Add => a + b,
2747            BinOp::Sub => a - b,
2748            BinOp::Mul => a * b,
2749            BinOp::Div => {
2750                if b == 0.0 {
2751                    return Err(RedDBError::Query(format!(
2752                        "division by zero in compound assignment for field '{column}'"
2753                    )));
2754                }
2755                a / b
2756            }
2757            BinOp::Mod => {
2758                if b == 0.0 {
2759                    return Err(RedDBError::Query(format!(
2760                        "modulo by zero in compound assignment for field '{column}'"
2761                    )));
2762                }
2763                a % b
2764            }
2765            _ => {
2766                return Err(RedDBError::Query(format!(
2767                    "unsupported compound assignment operator for field '{column}'"
2768                )));
2769            }
2770        };
2771        if !out.is_finite() {
2772            return Err(RedDBError::Query(format!(
2773                "numeric overflow in compound assignment for field '{column}'"
2774            )));
2775        }
2776        return Ok(Value::Float(out));
2777    }
2778
2779    let a = lhs_number.as_i128();
2780    let b = rhs_number.as_i128();
2781    let out = match op {
2782        BinOp::Add => a.checked_add(b),
2783        BinOp::Sub => a.checked_sub(b),
2784        BinOp::Mul => a.checked_mul(b),
2785        BinOp::Mod => {
2786            if b == 0 {
2787                return Err(RedDBError::Query(format!(
2788                    "modulo by zero in compound assignment for field '{column}'"
2789                )));
2790            }
2791            a.checked_rem(b)
2792        }
2793        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2794        _ => None,
2795    }
2796    .ok_or_else(|| {
2797        RedDBError::Query(format!(
2798            "numeric overflow in compound assignment for field '{column}'"
2799        ))
2800    })?;
2801
2802    if matches!(lhs, Value::UnsignedInteger(_)) {
2803        let value = u64::try_from(out).map_err(|_| {
2804            RedDBError::Query(format!(
2805                "numeric overflow in compound assignment for field '{column}'"
2806            ))
2807        })?;
2808        Ok(Value::UnsignedInteger(value))
2809    } else {
2810        let value = i64::try_from(out).map_err(|_| {
2811            RedDBError::Query(format!(
2812                "numeric overflow in compound assignment for field '{column}'"
2813            ))
2814        })?;
2815        Ok(Value::Integer(value))
2816    }
2817}
2818
2819#[derive(Clone, Copy)]
2820enum CompoundNumber {
2821    Integer(i128),
2822    Float(f64),
2823}
2824
2825impl CompoundNumber {
2826    fn from_value(value: &Value) -> Option<Self> {
2827        match value {
2828            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2829            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2830            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2831            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2832            _ => None,
2833        }
2834    }
2835
2836    fn is_float(self) -> bool {
2837        matches!(self, Self::Float(_))
2838    }
2839
2840    fn as_f64(self) -> f64 {
2841        match self {
2842            Self::Integer(value) => value as f64,
2843            Self::Float(value) => value,
2844        }
2845    }
2846
2847    fn as_i128(self) -> i128 {
2848        match self {
2849            Self::Integer(value) => value,
2850            Self::Float(_) => unreachable!("float compound number used as integer"),
2851        }
2852    }
2853}
2854
2855fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2856    match expr {
2857        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2858        Expr::Column { field, .. } => {
2859            field_ref_matches_update_column(field, table_name, target_column)
2860        }
2861        Expr::BinaryOp { lhs, rhs, .. } => {
2862            expr_references_update_column(lhs, table_name, target_column)
2863                || expr_references_update_column(rhs, table_name, target_column)
2864        }
2865        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2866            expr_references_update_column(operand, table_name, target_column)
2867        }
2868        Expr::FunctionCall { args, .. } => args
2869            .iter()
2870            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2871        Expr::Case {
2872            branches, else_, ..
2873        } => {
2874            branches.iter().any(|(cond, value)| {
2875                expr_references_update_column(cond, table_name, target_column)
2876                    || expr_references_update_column(value, table_name, target_column)
2877            }) || else_
2878                .as_deref()
2879                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2880        }
2881        Expr::IsNull { operand, .. } => {
2882            expr_references_update_column(operand, table_name, target_column)
2883        }
2884        Expr::InList { target, values, .. } => {
2885            expr_references_update_column(target, table_name, target_column)
2886                || values
2887                    .iter()
2888                    .any(|value| expr_references_update_column(value, table_name, target_column))
2889        }
2890        Expr::Between {
2891            target, low, high, ..
2892        } => {
2893            expr_references_update_column(target, table_name, target_column)
2894                || expr_references_update_column(low, table_name, target_column)
2895                || expr_references_update_column(high, table_name, target_column)
2896        }
2897        Expr::WindowFunctionCall { args, window, .. } => {
2898            args.iter()
2899                .any(|arg| expr_references_update_column(arg, table_name, target_column))
2900                || window
2901                    .partition_by
2902                    .iter()
2903                    .any(|e| expr_references_update_column(e, table_name, target_column))
2904                || window
2905                    .order_by
2906                    .iter()
2907                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
2908        }
2909    }
2910}
2911
2912fn field_ref_matches_update_column(
2913    field: &FieldRef,
2914    table_name: &str,
2915    target_column: &str,
2916) -> bool {
2917    match field {
2918        FieldRef::TableColumn { table, column } => {
2919            column.eq_ignore_ascii_case(target_column)
2920                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2921        }
2922        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2923            false
2924        }
2925    }
2926}
2927
2928fn resolve_update_entity_by_logical_id(
2929    runtime: &RedDBRuntime,
2930    table: &str,
2931    logical_id: EntityId,
2932) -> Option<UnifiedEntity> {
2933    let store = runtime.inner.db.store();
2934    if let Some(entity) =
2935        crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2936            .resolve_logical_id(&store, table, logical_id)
2937    {
2938        return Some(entity);
2939    }
2940    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
2941    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
2942    store.get(table, logical_id)
2943}
2944
2945fn update_cdc_item_kind(
2946    runtime: &RedDBRuntime,
2947    collection: &str,
2948    entity: &UnifiedEntity,
2949) -> &'static str {
2950    match &entity.data {
2951        EntityData::Node(_) => return "node",
2952        EntityData::Edge(_) => return "edge",
2953        _ => {}
2954    }
2955
2956    match runtime
2957        .db()
2958        .collection_contract(collection)
2959        .map(|contract| contract.declared_model)
2960    {
2961        Some(crate::catalog::CollectionModel::Document) => "document",
2962        Some(crate::catalog::CollectionModel::Kv)
2963        | Some(crate::catalog::CollectionModel::Vault) => "kv",
2964        _ => "row",
2965    }
2966}
2967
2968fn ordered_update_target_ids(
2969    manager: &Arc<crate::storage::SegmentManager>,
2970    entity_ids: &[EntityId],
2971    order_by: &[OrderByClause],
2972    limit: Option<usize>,
2973) -> Vec<EntityId> {
2974    let mut entities: Vec<UnifiedEntity> =
2975        manager.get_many(entity_ids).into_iter().flatten().collect();
2976    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
2977    if let Some(limit) = limit {
2978        entities.truncate(limit);
2979    }
2980    entities.into_iter().map(|entity| entity.id).collect()
2981}
2982
2983fn compare_update_order(
2984    left: &UnifiedEntity,
2985    right: &UnifiedEntity,
2986    order_by: &[OrderByClause],
2987) -> Ordering {
2988    for clause in order_by {
2989        let left_value = update_order_value(left, &clause.field);
2990        let right_value = update_order_value(right, &clause.field);
2991        let ordering = compare_update_order_values(
2992            left_value.as_ref(),
2993            right_value.as_ref(),
2994            clause.nulls_first,
2995        );
2996        if ordering != Ordering::Equal {
2997            return if clause.ascending {
2998                ordering
2999            } else {
3000                ordering.reverse()
3001            };
3002        }
3003    }
3004    left.logical_id().raw().cmp(&right.logical_id().raw())
3005}
3006
3007fn compare_update_order_values(
3008    left: Option<&Value>,
3009    right: Option<&Value>,
3010    nulls_first: bool,
3011) -> Ordering {
3012    match (left, right) {
3013        (None, None) => Ordering::Equal,
3014        (None, Some(_)) => {
3015            if nulls_first {
3016                Ordering::Less
3017            } else {
3018                Ordering::Greater
3019            }
3020        }
3021        (Some(_), None) => {
3022            if nulls_first {
3023                Ordering::Greater
3024            } else {
3025                Ordering::Less
3026            }
3027        }
3028        (Some(left), Some(right)) => {
3029            crate::storage::query::value_compare::total_compare_values(left, right)
3030        }
3031    }
3032}
3033
3034fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3035    let FieldRef::TableColumn { table, column } = field else {
3036        return None;
3037    };
3038    if !table.is_empty() {
3039        return None;
3040    }
3041    if column.eq_ignore_ascii_case("rid") {
3042        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3043    }
3044    match &entity.data {
3045        EntityData::Row(row) => row.get_field(column).cloned(),
3046        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3047            .and_then(|record| record.get(column).cloned()),
3048        _ => None,
3049    }
3050}
3051
3052fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3053    if columns.is_empty() {
3054        return columns;
3055    }
3056
3057    let mut unique = Vec::with_capacity(columns.len());
3058    for column in columns.drain(..) {
3059        if !unique
3060            .iter()
3061            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3062        {
3063            unique.push(column);
3064        }
3065    }
3066    unique
3067}
3068
3069// =============================================================================
3070// Helper functions for extracting typed values from column/value pairs
3071// =============================================================================
3072
3073const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3074
3075fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3076    if column.eq_ignore_ascii_case("_ttl") {
3077        Some(SQL_TTL_METADATA_COLUMNS[0])
3078    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3079        Some(SQL_TTL_METADATA_COLUMNS[1])
3080    } else if column.eq_ignore_ascii_case("_expires_at") {
3081        Some(SQL_TTL_METADATA_COLUMNS[2])
3082    } else {
3083        None
3084    }
3085}
3086
3087/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3088/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3089/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3090/// `_ttl_ms` and `_expires_at` are passed through.
3091fn canonicalize_sql_ttl_metadata(
3092    key: &'static str,
3093    value: MetadataValue,
3094) -> (&'static str, MetadataValue) {
3095    if key != "_ttl" {
3096        return (key, value);
3097    }
3098    let scaled = match value {
3099        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3100        MetadataValue::Timestamp(ms_or_s) => {
3101            // Timestamp is already chosen for very large values; treat as
3102            // already-ms to avoid silent overflow.
3103            MetadataValue::Timestamp(ms_or_s)
3104        }
3105        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3106        other => other,
3107    };
3108    ("_ttl_ms", scaled)
3109}
3110
3111/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3112/// `SECRET('...')` literals. The runtime strips this marker and
3113/// applies the actual crypto transform during INSERT execution.
3114pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3115
3116impl RedDBRuntime {
3117    /// Strip the plaintext sentinel from a `Value::Password` or
3118    /// `Value::Secret` produced by the parser and apply the real
3119    /// crypto transform. `Password` is always hashed with argon2id.
3120    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3121    /// when `red.config.secret.auto_encrypt = true` (default).
3122    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3123        match value {
3124            Value::Password(marked) => {
3125                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3126                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3127                } else {
3128                    Ok(Value::Password(marked))
3129                }
3130            }
3131            Value::Secret(bytes) => {
3132                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3133                    if !self.secret_auto_encrypt() {
3134                        return Err(RedDBError::Query(
3135                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3136                             is false. Insert pre-encrypted bytes directly instead."
3137                                .to_string(),
3138                        ));
3139                    }
3140                    let key = self.secret_aes_key().ok_or_else(|| {
3141                        RedDBError::Query(
3142                            "SECRET() column encryption requires a bootstrapped \
3143                             vault (red.secret.aes_key is missing). Start the server \
3144                             with --vault to enable."
3145                                .to_string(),
3146                        )
3147                    })?;
3148                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3149                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3150                } else {
3151                    Ok(Value::Secret(bytes))
3152                }
3153            }
3154            other => Ok(other),
3155        }
3156    }
3157}
3158
3159/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3160/// This is the on-disk representation of `Value::Secret`.
3161fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3162    let nonce_bytes = crate::auth::store::random_bytes(12);
3163    let mut nonce = [0u8; 12];
3164    nonce.copy_from_slice(&nonce_bytes[..12]);
3165    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3166    let mut out = Vec::with_capacity(12 + ct.len());
3167    out.extend_from_slice(&nonce);
3168    out.extend_from_slice(&ct);
3169    out
3170}
3171
3172/// Decode a `Value::Secret` payload back to plaintext. Returns
3173/// `None` when the payload is too short or AES-GCM authentication
3174/// fails (tampered or wrong key).
3175pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3176    if payload.len() < 12 {
3177        return None;
3178    }
3179    let mut nonce = [0u8; 12];
3180    nonce.copy_from_slice(&payload[..12]);
3181    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3182}
3183
3184fn split_insert_metadata(
3185    runtime: &RedDBRuntime,
3186    columns: &[String],
3187    values: &[Value],
3188) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3189    let mut fields = Vec::new();
3190    let mut metadata = Vec::new();
3191
3192    for (column, value) in columns.iter().zip(values.iter()) {
3193        // Still support legacy _ttl columns for backward compat
3194        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3195            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3196            let (canonical_key, canonical_value) =
3197                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3198            metadata.push((canonical_key.to_string(), canonical_value));
3199            continue;
3200        }
3201        fields.push((
3202            column.clone(),
3203            runtime.resolve_crypto_sentinel(value.clone())?,
3204        ));
3205    }
3206
3207    Ok((fields, metadata))
3208}
3209
3210/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3211fn merge_with_clauses(
3212    metadata: &mut Vec<(String, MetadataValue)>,
3213    ttl_ms: Option<u64>,
3214    expires_at_ms: Option<u64>,
3215    with_metadata: &[(String, Value)],
3216) {
3217    if let Some(ms) = ttl_ms {
3218        metadata.push((
3219            "_ttl_ms".to_string(),
3220            if ms <= i64::MAX as u64 {
3221                MetadataValue::Int(ms as i64)
3222            } else {
3223                MetadataValue::Timestamp(ms)
3224            },
3225        ));
3226    }
3227    if let Some(ms) = expires_at_ms {
3228        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3229    }
3230    for (key, value) in with_metadata {
3231        let meta_value = match value {
3232            Value::Text(s) => MetadataValue::String(s.to_string()),
3233            Value::Integer(n) => MetadataValue::Int(*n),
3234            Value::Float(n) => MetadataValue::Float(*n),
3235            Value::Boolean(b) => MetadataValue::Bool(*b),
3236            _ => MetadataValue::String(value.to_string()),
3237        };
3238        metadata.push((key.clone(), meta_value));
3239    }
3240}
3241
3242fn merge_vector_metadata_column(
3243    metadata: &mut Vec<(String, MetadataValue)>,
3244    columns: &[String],
3245    values: &[Value],
3246) -> RedDBResult<()> {
3247    let Some(value) = columns
3248        .iter()
3249        .position(|column| column.eq_ignore_ascii_case("metadata"))
3250        .map(|index| &values[index])
3251    else {
3252        return Ok(());
3253    };
3254    let json = match value {
3255        Value::Null => return Ok(()),
3256        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3257            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3258        })?,
3259        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3260            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3261        })?,
3262        other => {
3263            return Err(RedDBError::Query(format!(
3264                "column 'metadata' expected JSON object, got {other:?}"
3265            )))
3266        }
3267    };
3268    let parsed = metadata_from_json(&json)?;
3269    for (key, value) in parsed.iter() {
3270        metadata.push((key.clone(), value.clone()));
3271    }
3272    Ok(())
3273}
3274
3275fn apply_collection_default_ttl_metadata(
3276    runtime: &RedDBRuntime,
3277    collection: &str,
3278    metadata: &mut Vec<(String, MetadataValue)>,
3279) {
3280    if has_internal_ttl_metadata(metadata) {
3281        return;
3282    }
3283
3284    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3285        return;
3286    };
3287
3288    metadata.push((
3289        "_ttl_ms".to_string(),
3290        if default_ttl_ms <= i64::MAX as u64 {
3291            MetadataValue::Int(default_ttl_ms as i64)
3292        } else {
3293            MetadataValue::Timestamp(default_ttl_ms)
3294        },
3295    ));
3296}
3297
3298fn ensure_non_tree_reserved_metadata_entries(
3299    metadata: &[(String, MetadataValue)],
3300) -> RedDBResult<()> {
3301    for (key, _) in metadata {
3302        ensure_non_tree_reserved_metadata_key(key)?;
3303    }
3304    Ok(())
3305}
3306
3307fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3308    if key.starts_with(TREE_METADATA_PREFIX) {
3309        return Err(RedDBError::Query(format!(
3310            "metadata key '{}' is reserved for managed trees",
3311            key
3312        )));
3313    }
3314    Ok(())
3315}
3316
3317fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3318    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3319        return Err(RedDBError::Query(format!(
3320            "edge label '{}' is reserved for managed trees",
3321            TREE_CHILD_EDGE_LABEL
3322        )));
3323    }
3324    Ok(())
3325}
3326
3327fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3328    let mut columns = Vec::with_capacity(pairs.len());
3329    let mut values = Vec::with_capacity(pairs.len());
3330
3331    for (column, value) in pairs {
3332        columns.push(column.clone());
3333        values.push(value.clone());
3334    }
3335
3336    (columns, values)
3337}
3338
3339/// Find a required column value and return it as-is.
3340fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3341    for (i, col) in columns.iter().enumerate() {
3342        if col.eq_ignore_ascii_case(name) {
3343            return Ok(values[i].clone());
3344        }
3345    }
3346    Err(RedDBError::Query(format!(
3347        "required column '{name}' not found in INSERT"
3348    )))
3349}
3350
3351/// Find a required column value and coerce to String.
3352fn find_column_value_string(
3353    columns: &[String],
3354    values: &[Value],
3355    name: &str,
3356) -> RedDBResult<String> {
3357    let val = find_column_value(columns, values, name)?;
3358    match val {
3359        Value::Text(s) => Ok(s.to_string()),
3360        Value::Integer(n) => Ok(n.to_string()),
3361        Value::Float(n) => Ok(n.to_string()),
3362        other => Err(RedDBError::Query(format!(
3363            "column '{name}' expected text, got {other:?}"
3364        ))),
3365    }
3366}
3367
3368fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3369    let val = find_column_value(columns, values, name)?;
3370    match val {
3371        Value::Float(n) => Ok(n),
3372        Value::Integer(n) => Ok(n as f64),
3373        Value::UnsignedInteger(n) => Ok(n as f64),
3374        Value::Text(s) => s
3375            .parse::<f64>()
3376            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3377        other => Err(RedDBError::Query(format!(
3378            "column '{name}' expected number, got {other:?}"
3379        ))),
3380    }
3381}
3382
3383/// Find an optional column value as String.
3384fn find_column_value_opt_string(
3385    columns: &[String],
3386    values: &[Value],
3387    name: &str,
3388) -> Option<String> {
3389    for (i, col) in columns.iter().enumerate() {
3390        if col.eq_ignore_ascii_case(name) {
3391            return match &values[i] {
3392                Value::Null => None,
3393                Value::Text(s) => Some(s.to_string()),
3394                Value::Integer(n) => Some(n.to_string()),
3395                Value::Float(n) => Some(n.to_string()),
3396                _ => None,
3397            };
3398        }
3399    }
3400    None
3401}
3402
3403/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3404///
3405/// Accepts integer literals, decimal strings, and node labels resolved via
3406/// the per-collection graph label index (same source of truth that
3407/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3408/// labels error so callers can fall back to the numeric id form.
3409fn resolve_edge_endpoint(
3410    store: &crate::storage::unified::UnifiedStore,
3411    collection: &str,
3412    columns: &[String],
3413    values: &[Value],
3414    name: &str,
3415) -> RedDBResult<u64> {
3416    let val = find_column_value(columns, values, name)?;
3417    match val {
3418        Value::Integer(n) => Ok(n as u64),
3419        Value::UnsignedInteger(n) => Ok(n),
3420        Value::Text(s) => {
3421            if let Ok(n) = s.parse::<u64>() {
3422                return Ok(n);
3423            }
3424            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3425            match matches.len() {
3426                0 => Err(RedDBError::Query(format!(
3427                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3428                ))),
3429                1 => Ok(matches[0].raw()),
3430                n => Err(RedDBError::Query(format!(
3431                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3432                ))),
3433            }
3434        }
3435        other => Err(RedDBError::Query(format!(
3436            "column '{name}' expected integer or node label, got {other:?}"
3437        ))),
3438    }
3439}
3440
3441fn resolve_edge_endpoint_any(
3442    store: &crate::storage::unified::UnifiedStore,
3443    collection: &str,
3444    columns: &[String],
3445    values: &[Value],
3446    names: &[&str],
3447) -> RedDBResult<u64> {
3448    for name in names {
3449        if columns
3450            .iter()
3451            .any(|column| column.eq_ignore_ascii_case(name))
3452        {
3453            return resolve_edge_endpoint(store, collection, columns, values, name);
3454        }
3455    }
3456
3457    Err(RedDBError::Query(format!(
3458        "required column '{}' not found in INSERT",
3459        names.first().copied().unwrap_or("from_rid")
3460    )))
3461}
3462
3463/// Find a required column value and coerce to u64.
3464fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3465    let val = find_column_value(columns, values, name)?;
3466    match val {
3467        Value::Integer(n) => Ok(n as u64),
3468        Value::UnsignedInteger(n) => Ok(n),
3469        Value::Text(s) => s
3470            .parse::<u64>()
3471            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3472        other => Err(RedDBError::Query(format!(
3473            "column '{name}' expected integer, got {other:?}"
3474        ))),
3475    }
3476}
3477
3478/// Find an optional column value as f32.
3479fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3480    for (i, col) in columns.iter().enumerate() {
3481        if col.eq_ignore_ascii_case(name) {
3482            return match &values[i] {
3483                Value::Float(n) => Some(*n as f32),
3484                Value::Integer(n) => Some(*n as f32),
3485                Value::Null => None,
3486                _ => None,
3487            };
3488        }
3489    }
3490    None
3491}
3492
3493/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3494fn find_column_value_vec_f32(
3495    columns: &[String],
3496    values: &[Value],
3497    name: &str,
3498) -> RedDBResult<Vec<f32>> {
3499    let val = find_column_value(columns, values, name)?;
3500    match val {
3501        Value::Vector(v) => Ok(v),
3502        Value::Json(bytes) => {
3503            // Try to parse as JSON array of numbers
3504            let s = std::str::from_utf8(&bytes).map_err(|_| {
3505                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3506            })?;
3507            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3508                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3509            })?;
3510            Ok(arr)
3511        }
3512        other => Err(RedDBError::Query(format!(
3513            "column '{name}' expected vector, got {other:?}"
3514        ))),
3515    }
3516}
3517
3518fn find_column_value_vec_f32_any(
3519    columns: &[String],
3520    values: &[Value],
3521    names: &[&str],
3522) -> RedDBResult<Vec<f32>> {
3523    for name in names {
3524        if columns
3525            .iter()
3526            .any(|column| column.eq_ignore_ascii_case(name))
3527        {
3528            return find_column_value_vec_f32(columns, values, name);
3529        }
3530    }
3531    Err(RedDBError::Query(format!(
3532        "required vector column '{}' not found in INSERT",
3533        names.join("' or '")
3534    )))
3535}
3536
3537/// Extract remaining properties (all columns not in the exclusion list).
3538fn extract_remaining_properties(
3539    columns: &[String],
3540    values: &[Value],
3541    exclude: &[&str],
3542) -> Vec<(String, Value)> {
3543    columns
3544        .iter()
3545        .zip(values.iter())
3546        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3547        .map(|(col, val)| (col.clone(), val.clone()))
3548        .collect()
3549}
3550
3551fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3552    let mut invalid = Vec::new();
3553    for column in columns {
3554        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3555            invalid.push(column.clone());
3556        }
3557    }
3558
3559    if invalid.is_empty() {
3560        Ok(())
3561    } else {
3562        Err(RedDBError::Query(format!(
3563            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3564            invalid.join(", ")
3565        )))
3566    }
3567}
3568
3569fn is_timeseries_insert_column(column: &str) -> bool {
3570    matches!(
3571        column.to_ascii_lowercase().as_str(),
3572        "metric"
3573            | "value"
3574            | "tags"
3575            | "timestamp"
3576            | "timestamp_ns"
3577            | "time"
3578            // Analytics-event extension (#577): an analytics row carries
3579            // an `event_name` + JSON `payload`. The payload is validated
3580            // against the AnalyticsSchemaRegistry inside
3581            // `insert_timeseries_point` before the row lands.
3582            | "event_name"
3583            | "payload"
3584    )
3585}
3586
3587fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3588    let mut found = None;
3589
3590    for alias in ["timestamp_ns", "timestamp", "time"] {
3591        for (index, column) in columns.iter().enumerate() {
3592            if !column.eq_ignore_ascii_case(alias) {
3593                continue;
3594            }
3595
3596            if found.is_some() {
3597                return Err(RedDBError::Query(
3598                    "timeseries INSERT accepts only one timestamp column".to_string(),
3599                ));
3600            }
3601
3602            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3603        }
3604    }
3605
3606    Ok(found)
3607}
3608
3609fn find_timeseries_tags(
3610    columns: &[String],
3611    values: &[Value],
3612) -> RedDBResult<std::collections::HashMap<String, String>> {
3613    for (index, column) in columns.iter().enumerate() {
3614        if column.eq_ignore_ascii_case("tags") {
3615            return parse_timeseries_tags(&values[index]);
3616        }
3617    }
3618    Ok(std::collections::HashMap::new())
3619}
3620
3621fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3622    match value {
3623        Value::Null => Ok(std::collections::HashMap::new()),
3624        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3625        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3626        other => Err(RedDBError::Query(format!(
3627            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3628        ))),
3629    }
3630}
3631
3632fn parse_timeseries_tags_json(
3633    bytes: &[u8],
3634) -> RedDBResult<std::collections::HashMap<String, String>> {
3635    let json: crate::json::Value = crate::json::from_slice(bytes)
3636        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3637
3638    let object = match json {
3639        crate::json::Value::Object(object) => object,
3640        other => {
3641            return Err(RedDBError::Query(format!(
3642                "timeseries tags must be a JSON object, got {other:?}"
3643            )))
3644        }
3645    };
3646
3647    let mut tags = std::collections::HashMap::with_capacity(object.len());
3648    for (key, value) in object {
3649        tags.insert(key, json_tag_value_to_string(&value));
3650    }
3651    Ok(tags)
3652}
3653
3654/// Encode a tag value for storage so the original JSON type can be
3655/// recovered on read (issue #543).
3656///
3657/// Time-series tags are stored as `HashMap<String, String>` on the
3658/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3659/// the segment codec, WAL and gRPC mirrors don't need a new value
3660/// variant. To preserve the original JSON type across that
3661/// string-only channel we prepend the
3662/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3663/// and serialize the value as compact JSON text. The read paths
3664/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3665/// the marker, parse the suffix, and recover a real JSON value.
3666/// Tags written through other channels (Prometheus remote write,
3667/// metrics handlers, legacy on-disk data) lack the marker and are
3668/// returned as `JsonValue::String(raw)` exactly as before.
3669fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3670    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3671    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3672    buf.push_str(&value.to_string_compact());
3673    buf
3674}
3675
3676fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3677    match value {
3678        Value::UnsignedInteger(value) => Ok(*value),
3679        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3680        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3681        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3682            RedDBError::Query(format!(
3683                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3684            ))
3685        }),
3686        other => Err(RedDBError::Query(format!(
3687            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3688        ))),
3689    }
3690}
3691
3692fn current_unix_ns() -> u64 {
3693    std::time::SystemTime::now()
3694        .duration_since(std::time::UNIX_EPOCH)
3695        .unwrap_or_default()
3696        .as_nanos()
3697        .min(u128::from(u64::MAX)) as u64
3698}
3699
3700fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3701    use crate::json::{Map, Value as JV};
3702    match value {
3703        MetadataValue::Null => JV::Null,
3704        MetadataValue::Bool(value) => JV::Bool(*value),
3705        MetadataValue::Int(value) => JV::Number(*value as f64),
3706        MetadataValue::Float(value) => JV::Number(*value),
3707        MetadataValue::String(value) => JV::String(value.clone()),
3708        MetadataValue::Bytes(value) => JV::Array(
3709            value
3710                .iter()
3711                .map(|value| JV::Number(*value as f64))
3712                .collect(),
3713        ),
3714        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3715        MetadataValue::Array(values) => {
3716            JV::Array(values.iter().map(metadata_value_to_json).collect())
3717        }
3718        MetadataValue::Object(object) => {
3719            let entries = object
3720                .iter()
3721                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3722                .collect();
3723            JV::Object(entries)
3724        }
3725        MetadataValue::Geo { lat, lon } => {
3726            let mut object = Map::new();
3727            object.insert("lat".to_string(), JV::Number(*lat));
3728            object.insert("lon".to_string(), JV::Number(*lon));
3729            JV::Object(object)
3730        }
3731        MetadataValue::Reference(target) => {
3732            let mut object = Map::new();
3733            object.insert(
3734                "collection".to_string(),
3735                JV::String(target.collection().to_string()),
3736            );
3737            object.insert(
3738                "entity_id".to_string(),
3739                JV::Number(target.entity_id().raw() as f64),
3740            );
3741            JV::Object(object)
3742        }
3743        MetadataValue::References(values) => {
3744            let refs = values
3745                .iter()
3746                .map(|target| {
3747                    let mut object = Map::new();
3748                    object.insert(
3749                        "collection".to_string(),
3750                        JV::String(target.collection().to_string()),
3751                    );
3752                    object.insert(
3753                        "entity_id".to_string(),
3754                        JV::Number(target.entity_id().raw() as f64),
3755                    );
3756                    JV::Object(object)
3757                })
3758                .collect();
3759            JV::Array(refs)
3760        }
3761    }
3762}
3763
3764fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3765    match value {
3766        Value::Null => MetadataValue::Null,
3767        Value::Boolean(value) => MetadataValue::Bool(*value),
3768        Value::Integer(value) => MetadataValue::Int(*value),
3769        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3770        Value::Float(value) => MetadataValue::Float(*value),
3771        Value::Text(value) => MetadataValue::String(value.to_string()),
3772        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3773        Value::Timestamp(value) => {
3774            if *value >= 0 {
3775                metadata_u64_to_value(*value as u64)
3776            } else {
3777                MetadataValue::Int(*value)
3778            }
3779        }
3780        Value::TimestampMs(value) => {
3781            if *value >= 0 {
3782                metadata_u64_to_value(*value as u64)
3783            } else {
3784                MetadataValue::Int(*value)
3785            }
3786        }
3787        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3788        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3789        Value::Date(value) => MetadataValue::String(value.to_string()),
3790        Value::Time(value) => MetadataValue::String(value.to_string()),
3791        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3792        Value::Ipv4(value) => MetadataValue::String(format!(
3793            "{}.{}.{}.{}",
3794            (value >> 24) & 0xFF,
3795            (value >> 16) & 0xFF,
3796            (value >> 8) & 0xFF,
3797            value & 0xFF
3798        )),
3799        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3800        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3801        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3802        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3803            lat: *lat as f64 / 1_000_000.0,
3804            lon: *lon as f64 / 1_000_000.0,
3805        },
3806        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3807        Value::TableRef(value) => MetadataValue::String(value.clone()),
3808        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3809        Value::Password(value) => MetadataValue::String(value.clone()),
3810        Value::Array(values) => {
3811            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3812        }
3813        _ => MetadataValue::String(value.to_string()),
3814    }
3815}
3816
3817fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3818    match value {
3819        Value::Null => Ok(MetadataValue::Null),
3820        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3821        Value::Integer(_) => Err(RedDBError::Query(format!(
3822            "column '{field}' must be non-negative for TTL metadata"
3823        ))),
3824        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3825        Value::Float(value) if value.is_finite() => {
3826            if value.fract().abs() >= f64::EPSILON {
3827                return Err(RedDBError::Query(format!(
3828                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3829                )));
3830            }
3831            if *value < 0.0 {
3832                return Err(RedDBError::Query(format!(
3833                    "column '{field}' must be non-negative for TTL metadata"
3834                )));
3835            }
3836            if *value > u64::MAX as f64 {
3837                return Err(RedDBError::Query(format!(
3838                    "column '{field}' value is too large"
3839                )));
3840            }
3841            Ok(metadata_u64_to_value(*value as u64))
3842        }
3843        Value::Float(_) => Err(RedDBError::Query(format!(
3844            "column '{field}' must be a finite number"
3845        ))),
3846        Value::Text(value) => {
3847            let value = value.trim();
3848            if let Ok(value) = value.parse::<u64>() {
3849                Ok(metadata_u64_to_value(value))
3850            } else if let Ok(value) = value.parse::<i64>() {
3851                if value < 0 {
3852                    return Err(RedDBError::Query(format!(
3853                        "column '{field}' must be non-negative for TTL metadata"
3854                    )));
3855                }
3856                Ok(metadata_u64_to_value(value as u64))
3857            } else if let Ok(value) = value.parse::<f64>() {
3858                if !value.is_finite() {
3859                    return Err(RedDBError::Query(format!(
3860                        "column '{field}' must be a finite number"
3861                    )));
3862                }
3863                if value.fract().abs() >= f64::EPSILON {
3864                    return Err(RedDBError::Query(format!(
3865                        "column '{field}' must be an integer (TTL metadata must be an integer)"
3866                    )));
3867                }
3868                if value < 0.0 {
3869                    return Err(RedDBError::Query(format!(
3870                        "column '{field}' must be non-negative for TTL metadata"
3871                    )));
3872                }
3873                if value > u64::MAX as f64 {
3874                    return Err(RedDBError::Query(format!(
3875                        "column '{field}' value is too large"
3876                    )));
3877                }
3878                Ok(metadata_u64_to_value(value as u64))
3879            } else {
3880                Err(RedDBError::Query(format!(
3881                    "column '{field}' expects a numeric value for TTL metadata"
3882                )))
3883            }
3884        }
3885        _ => Err(RedDBError::Query(format!(
3886            "column '{field}' expects a numeric value for TTL metadata"
3887        ))),
3888    }
3889}
3890
3891fn metadata_u64_to_value(value: u64) -> MetadataValue {
3892    if value <= i64::MAX as u64 {
3893        MetadataValue::Int(value as i64)
3894    } else {
3895        MetadataValue::Timestamp(value)
3896    }
3897}
3898
3899/// Phase 2 PG parity: inspect a column value and return `true` when
3900/// the dotted `tail` path is already present under it. Used by the
3901/// tenant auto-fill so rows that already carry an explicit value
3902/// (bulk import, admin insert on behalf of a tenant) are not
3903/// double-stamped with the session's current_tenant().
3904fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3905    let json = match value {
3906        Value::Null => return false,
3907        Value::Json(bytes) | Value::Blob(bytes) => {
3908            match crate::json::from_slice::<crate::json::Value>(bytes) {
3909                Ok(v) => v,
3910                Err(_) => return false,
3911            }
3912        }
3913        Value::Text(s) => {
3914            let trimmed = s.trim_start();
3915            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3916                return false;
3917            }
3918            match crate::json::from_str::<crate::json::Value>(s) {
3919                Ok(v) => v,
3920                Err(_) => return false,
3921            }
3922        }
3923        _ => return false,
3924    };
3925    let mut cursor = &json;
3926    for seg in tail.split('.') {
3927        match cursor {
3928            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3929                Some((_, v)) => cursor = v,
3930                None => return false,
3931            },
3932            _ => return false,
3933        }
3934    }
3935    !matches!(cursor, crate::json::Value::Null)
3936}
3937
3938/// Phase 2 PG parity: take a column value (possibly Null / Text /
3939/// Json) and return a `Value::Json` with the dotted `tail` path set
3940/// to `tenant_id`. Preserves every pre-existing key.
3941///
3942/// Accepts:
3943/// * `Value::Null`  → fresh `{tail: tenant_id}` object
3944/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
3945/// * `Value::text(s)` if `s` is valid JSON → same as Json
3946/// * anything else → error (user supplied a scalar where we need
3947///   a JSON container)
3948fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3949    let mut root = match current {
3950        Value::Null => crate::json::Value::Object(Default::default()),
3951        Value::Json(bytes) | Value::Blob(bytes) => {
3952            crate::json::from_slice(&bytes).map_err(|err| {
3953                RedDBError::Query(format!(
3954                    "tenant auto-fill: root column is not valid JSON ({err})"
3955                ))
3956            })?
3957        }
3958        Value::Text(s) => {
3959            if s.trim().is_empty() {
3960                crate::json::Value::Object(Default::default())
3961            } else {
3962                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3963                    RedDBError::Query(format!(
3964                        "tenant auto-fill: text root is not valid JSON ({err})"
3965                    ))
3966                })?
3967            }
3968        }
3969        other => {
3970            return Err(RedDBError::Query(format!(
3971                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
3972            )));
3973        }
3974    };
3975
3976    // Navigate path segments, creating intermediate objects on demand.
3977    let segments: Vec<&str> = tail.split('.').collect();
3978    let mut cursor: &mut crate::json::Value = &mut root;
3979    for (i, seg) in segments.iter().enumerate() {
3980        let is_last = i + 1 == segments.len();
3981        let map = match cursor {
3982            crate::json::Value::Object(m) => m,
3983            _ => {
3984                return Err(RedDBError::Query(format!(
3985                    "tenant auto-fill: segment '{seg}' is not inside an object"
3986                )));
3987            }
3988        };
3989        if is_last {
3990            map.insert(
3991                seg.to_string(),
3992                crate::json::Value::String(tenant_id.to_string()),
3993            );
3994            break;
3995        }
3996        cursor = map
3997            .entry(seg.to_string())
3998            .or_insert_with(|| crate::json::Value::Object(Default::default()));
3999    }
4000
4001    let bytes = crate::json::to_vec(&root).map_err(|err| {
4002        RedDBError::Query(format!(
4003            "tenant auto-fill: failed to re-serialize JSON ({err})"
4004        ))
4005    })?;
4006    Ok(Value::Json(bytes))
4007}
4008
4009#[cfg(test)]
4010mod tests {
4011    use crate::storage::schema::Value;
4012    use crate::storage::wal::{WalReader, WalRecord};
4013    use crate::{RedDBOptions, RedDBRuntime};
4014    use std::path::Path;
4015
4016    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4017        WalReader::open(wal_path)
4018            .expect("wal opens")
4019            .iter()
4020            .map(|record| record.expect("wal record decodes").1)
4021            .filter_map(|record| match record {
4022                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4023                _ => None,
4024            })
4025            .collect()
4026    }
4027
4028    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4029        action
4030            .windows(needle.len())
4031            .any(|window| window == needle.as_bytes())
4032    }
4033
4034    fn assert_statement_writes_collections_in_one_new_wal_batch(
4035        rt: &RedDBRuntime,
4036        wal_path: &Path,
4037        statement: &str,
4038        source: &str,
4039        event_queue: &str,
4040    ) {
4041        let before_batches = store_commit_batches(wal_path).len();
4042
4043        rt.execute_query(statement).unwrap();
4044
4045        let batches = store_commit_batches(wal_path);
4046        let statement_batches = &batches[before_batches..];
4047        let source_batch = statement_batches
4048            .iter()
4049            .position(|actions| {
4050                actions.iter().any(|action| {
4051                    action_contains_text(action, source)
4052                        && !action_contains_text(action, event_queue)
4053                })
4054            })
4055            .expect("source collection write batch is present");
4056        let event_batch = statement_batches
4057            .iter()
4058            .position(|actions| {
4059                actions
4060                    .iter()
4061                    .any(|action| action_contains_text(action, event_queue))
4062            })
4063            .expect("event queue write batch is present");
4064
4065        assert_eq!(
4066            source_batch, event_batch,
4067            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4068        );
4069    }
4070
4071    #[test]
4072    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4073        let dir = tempfile::tempdir().unwrap();
4074        let db_path = dir.path().join("events_dual_write.rdb");
4075        let wal_path = db_path.with_extension("rdb-uwal");
4076        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4077
4078        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4079            .unwrap();
4080        assert_statement_writes_collections_in_one_new_wal_batch(
4081            &rt,
4082            &wal_path,
4083            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4084            "users",
4085            "users_events",
4086        );
4087    }
4088
4089    #[test]
4090    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4091        let dir = tempfile::tempdir().unwrap();
4092        let db_path = dir.path().join("events_update_atomic.rdb");
4093        let wal_path = db_path.with_extension("rdb-uwal");
4094        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4095
4096        rt.execute_query(
4097            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4098        )
4099        .unwrap();
4100        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4101            .unwrap();
4102
4103        assert_statement_writes_collections_in_one_new_wal_batch(
4104            &rt,
4105            &wal_path,
4106            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4107            "users",
4108            "user_updates",
4109        );
4110    }
4111
4112    #[test]
4113    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4114        let dir = tempfile::tempdir().unwrap();
4115        let db_path = dir.path().join("events_delete_atomic.rdb");
4116        let wal_path = db_path.with_extension("rdb-uwal");
4117        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4118
4119        rt.execute_query(
4120            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4121        )
4122        .unwrap();
4123        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4124            .unwrap();
4125
4126        assert_statement_writes_collections_in_one_new_wal_batch(
4127            &rt,
4128            &wal_path,
4129            "DELETE FROM users WHERE id = 1",
4130            "users",
4131            "user_deletes",
4132        );
4133    }
4134
4135    #[test]
4136    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4137        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4138        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4139            .unwrap();
4140        for id in 0..5 {
4141            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4142                .unwrap();
4143        }
4144        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4145            .unwrap();
4146
4147        let updated = rt
4148            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4149            .unwrap();
4150        assert_eq!(updated.affected_rows, 3);
4151
4152        let selected = rt
4153            .execute_query("SELECT id, score FROM users ORDER BY id")
4154            .unwrap();
4155        let scores: Vec<(i64, i64)> = selected
4156            .result
4157            .records
4158            .iter()
4159            .map(|record| {
4160                let id = match record.get("id").unwrap() {
4161                    Value::Integer(value) => *value,
4162                    other => panic!("expected integer id, got {other:?}"),
4163                };
4164                let score = match record.get("score").unwrap() {
4165                    Value::Integer(value) => *value,
4166                    other => panic!("expected integer score, got {other:?}"),
4167                };
4168                (id, score)
4169            })
4170            .collect();
4171        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4172    }
4173
4174    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4175    /// same code path DELETE uses (#51, #52). Exercises the indexed
4176    /// equality fast-path (WHERE id = N with a HASH index), the
4177    /// unindexed range scan (WHERE score > N), and the no-WHERE
4178    /// full-scan branch to confirm the extracted "find target rows"
4179    /// loop preserves affected-row counts and the resulting row state.
4180    #[test]
4181    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4182        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4183        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4184            .unwrap();
4185        for id in 0..5 {
4186            rt.execute_query(&format!(
4187                "INSERT INTO items (id, score) VALUES ({id}, {})",
4188                id * 10
4189            ))
4190            .unwrap();
4191        }
4192        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4193            .unwrap();
4194
4195        // Indexed equality UPDATE — hits the hash fast-path inside
4196        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4197        // below the score>25 cutoff so the next assertion stays clean.
4198        let updated_one = rt
4199            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4200            .unwrap();
4201        assert_eq!(updated_one.affected_rows, 1);
4202
4203        // Unindexed scan UPDATE — bumps everyone with score > 25,
4204        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4205        // zoned/full-scan branch.
4206        let updated_many = rt
4207            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4208            .unwrap();
4209        assert_eq!(updated_many.affected_rows, 2);
4210
4211        let snapshot = rt
4212            .execute_query("SELECT id, score FROM items ORDER BY id")
4213            .unwrap();
4214        let pairs: Vec<(i64, i64)> = snapshot
4215            .result
4216            .records
4217            .iter()
4218            .map(|record| {
4219                let id = match record.get("id").unwrap() {
4220                    Value::Integer(value) => *value,
4221                    other => panic!("expected integer id, got {other:?}"),
4222                };
4223                let score = match record.get("score").unwrap() {
4224                    Value::Integer(value) => *value,
4225                    other => panic!("expected integer score, got {other:?}"),
4226                };
4227                (id, score)
4228            })
4229            .collect();
4230        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4231
4232        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4233        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4234        assert_eq!(updated_all.affected_rows, 5);
4235        let after = rt
4236            .execute_query("SELECT score FROM items ORDER BY id")
4237            .unwrap();
4238        let scores: Vec<i64> = after
4239            .result
4240            .records
4241            .iter()
4242            .map(|record| match record.get("score").unwrap() {
4243                Value::Integer(value) => *value,
4244                other => panic!("expected integer score, got {other:?}"),
4245            })
4246            .collect();
4247        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4248    }
4249
4250    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4251    /// both the index fast-path (WHERE id = N with a HASH index) and
4252    /// the unindexed scan path (WHERE score > N) to confirm the
4253    /// extracted "find target rows" loop preserves the affected-row
4254    /// count and which rows survive.
4255    #[test]
4256    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4257        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4258        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4259            .unwrap();
4260        for id in 0..5 {
4261            rt.execute_query(&format!(
4262                "INSERT INTO items (id, score) VALUES ({id}, {})",
4263                id * 10
4264            ))
4265            .unwrap();
4266        }
4267        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4268            .unwrap();
4269
4270        // Indexed equality DELETE — hits the hash fast-path inside
4271        // DmlTargetScan::find_target_ids.
4272        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4273        assert_eq!(deleted_one.affected_rows, 1);
4274
4275        // Unindexed scan DELETE — drops everyone with score > 25,
4276        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4277        // zoned/full-scan branch.
4278        let deleted_many = rt
4279            .execute_query("DELETE FROM items WHERE score > 25")
4280            .unwrap();
4281        assert_eq!(deleted_many.affected_rows, 2);
4282
4283        let surviving = rt
4284            .execute_query("SELECT id FROM items ORDER BY id")
4285            .unwrap();
4286        let ids: Vec<i64> = surviving
4287            .result
4288            .records
4289            .iter()
4290            .map(|record| match record.get("id").unwrap() {
4291                Value::Integer(value) => *value,
4292                other => panic!("expected integer id, got {other:?}"),
4293            })
4294            .collect();
4295        assert_eq!(ids, vec![0, 1]);
4296
4297        // Sanity: full-scan DELETE with no WHERE clears the rest.
4298        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4299        assert_eq!(deleted_rest.affected_rows, 2);
4300        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4301        assert!(empty.result.records.is_empty());
4302    }
4303
4304    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4305    /// INSERT but reject UPDATE and DELETE with the documented
4306    /// operator-facing error strings. Drives all three DML verbs so
4307    /// the centralized gate is exercised end-to-end.
4308    #[test]
4309    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4310        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4311        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4312            .unwrap();
4313
4314        // INSERT must succeed — APPEND ONLY exists precisely to allow
4315        // appends. The gate should be a no-op for INSERT.
4316        let inserted = rt
4317            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4318            .unwrap();
4319        assert_eq!(inserted.affected_rows, 1);
4320
4321        // UPDATE is rejected with the gate's UPDATE-specific message.
4322        let update_err = rt
4323            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4324            .unwrap_err();
4325        let msg = format!("{update_err}");
4326        assert!(
4327            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4328            "expected UPDATE rejection message, got: {msg}"
4329        );
4330
4331        // DELETE is rejected with the gate's DELETE-specific message.
4332        let delete_err = rt
4333            .execute_query("DELETE FROM events WHERE id = 1")
4334            .unwrap_err();
4335        let msg = format!("{delete_err}");
4336        assert!(
4337            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4338            "expected DELETE rejection message, got: {msg}"
4339        );
4340
4341        // Row should still be present — neither rejected mutation
4342        // touched storage.
4343        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4344        assert_eq!(surviving.result.records.len(), 1);
4345    }
4346
4347    /// CollectionContract gate: tables without an APPEND ONLY contract
4348    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4349    /// is a true pass-through, not an accidental block.
4350    #[test]
4351    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4352        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4353        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4354            .unwrap();
4355
4356        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4357            .unwrap();
4358        let updated = rt
4359            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4360            .unwrap();
4361        assert_eq!(updated.affected_rows, 1);
4362        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4363        assert_eq!(deleted.affected_rows, 1);
4364    }
4365
4366    #[test]
4367    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4368        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4369        rt.execute_query(
4370            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4371        )
4372        .unwrap();
4373
4374        let inserted = rt
4375            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4376            .unwrap();
4377        assert_eq!(inserted.affected_rows, 1);
4378
4379        let events = queue_payloads(&rt, "audit_log");
4380        assert_eq!(events.len(), 1);
4381        let event = events[0].as_object().expect("event payload object");
4382        assert!(event
4383            .get("event_id")
4384            .and_then(crate::json::Value::as_str)
4385            .is_some_and(|value| !value.is_empty()));
4386        assert_eq!(
4387            event.get("op").and_then(crate::json::Value::as_str),
4388            Some("insert")
4389        );
4390        assert_eq!(
4391            event.get("collection").and_then(crate::json::Value::as_str),
4392            Some("users")
4393        );
4394        assert_eq!(
4395            event.get("id").and_then(crate::json::Value::as_u64),
4396            Some(7)
4397        );
4398        assert!(event
4399            .get("ts")
4400            .and_then(crate::json::Value::as_u64)
4401            .is_some());
4402        assert!(event
4403            .get("lsn")
4404            .and_then(crate::json::Value::as_u64)
4405            .is_some());
4406        assert!(matches!(
4407            event.get("tenant"),
4408            Some(crate::json::Value::Null)
4409        ));
4410        assert!(matches!(
4411            event.get("before"),
4412            Some(crate::json::Value::Null)
4413        ));
4414        let after = event
4415            .get("after")
4416            .and_then(crate::json::Value::as_object)
4417            .expect("after object");
4418        assert_eq!(
4419            after.get("id").and_then(crate::json::Value::as_u64),
4420            Some(7)
4421        );
4422        assert_eq!(
4423            after.get("email").and_then(crate::json::Value::as_str),
4424            Some("a@example.com")
4425        );
4426    }
4427
4428    #[test]
4429    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4430        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4431        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4432            .unwrap();
4433
4434        rt.execute_query(
4435            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4436        )
4437        .unwrap();
4438
4439        let events = queue_payloads(&rt, "users_events");
4440        assert_eq!(events.len(), 2);
4441        let mut previous_lsn = 0;
4442        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4443            let object = event.as_object().expect("event payload object");
4444            assert_eq!(
4445                object.get("op").and_then(crate::json::Value::as_str),
4446                Some("insert")
4447            );
4448            assert_eq!(
4449                object.get("id").and_then(crate::json::Value::as_u64),
4450                Some(expected_id)
4451            );
4452            let lsn = object
4453                .get("lsn")
4454                .and_then(crate::json::Value::as_u64)
4455                .expect("event lsn");
4456            assert!(
4457                lsn > previous_lsn,
4458                "event LSNs should increase in row order"
4459            );
4460            previous_lsn = lsn;
4461            let after = object
4462                .get("after")
4463                .and_then(crate::json::Value::as_object)
4464                .expect("after object");
4465            assert_eq!(
4466                after.get("id").and_then(crate::json::Value::as_u64),
4467                Some(expected_id)
4468            );
4469        }
4470    }
4471
4472    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4473        let result = rt
4474            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4475            .expect("peek queue");
4476        result
4477            .result
4478            .records
4479            .iter()
4480            .map(
4481                |record| match record.get("payload").expect("payload column") {
4482                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4483                    other => panic!("expected JSON queue payload, got {other:?}"),
4484                },
4485            )
4486            .collect()
4487    }
4488
4489    // ── #112: auto-index user `id` on first insert ─────────────────────
4490
4491    /// First insert into a fresh collection that carries a column named
4492    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4493    /// populate it transparently, and `WHERE id = N` lookups exercise
4494    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4495    ///
4496    /// This is the load-bearing acceptance test for #112 — without the
4497    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4498    /// fall through to a full segment scan (the 4× perf gap documented
4499    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4500    #[test]
4501    fn auto_index_id_fires_on_first_insert() {
4502        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4503        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4504            .unwrap();
4505
4506        // Pre-condition: no index on `id` yet.
4507        assert!(
4508            rt.index_store_ref()
4509                .find_index_for_column("bench_users", "id")
4510                .is_none(),
4511            "freshly created collection should not have an `id` index"
4512        );
4513
4514        // Single-row INSERT — drives `MutationEngine::append_one`.
4515        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4516            .unwrap();
4517
4518        // Post-condition: hash index registered on `id`.
4519        let registered = rt
4520            .index_store_ref()
4521            .find_index_for_column("bench_users", "id")
4522            .expect("auto-index hook should have registered idx_id on first insert");
4523        assert_eq!(registered.name, "idx_id");
4524        assert_eq!(registered.collection, "bench_users");
4525        assert_eq!(registered.columns, vec!["id".to_string()]);
4526        assert!(matches!(
4527            registered.method,
4528            super::super::index_store::IndexMethodKind::Hash
4529        ));
4530
4531        // Subsequent inserts populate the index; `WHERE id = N` should
4532        // resolve via the hash fast path and round-trip every row.
4533        for id in 2..=5 {
4534            rt.execute_query(&format!(
4535                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4536                id * 10
4537            ))
4538            .unwrap();
4539        }
4540        for id in 1..=5 {
4541            let result = rt
4542                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4543                .unwrap();
4544            assert_eq!(
4545                result.result.records.len(),
4546                1,
4547                "id={id} should match one row"
4548            );
4549        }
4550
4551        // Delete via the hash fast-path — exactly the bench scenario the
4552        // perf doc identified as the 4× regression. With the index
4553        // present, `find_target_ids` short-circuits before
4554        // `for_each_entity_zoned` runs.
4555        let deleted = rt
4556            .execute_query("DELETE FROM bench_users WHERE id = 3")
4557            .unwrap();
4558        assert_eq!(deleted.affected_rows, 1);
4559    }
4560
4561    /// Bulk INSERT (the multi-row VALUES path) drives
4562    /// `MutationEngine::append_batch`. The hook must fire there too —
4563    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4564    /// wire bulk INSERT) skip auto-indexing entirely.
4565    #[test]
4566    fn auto_index_id_fires_on_first_bulk_insert() {
4567        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4568        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4569            .unwrap();
4570
4571        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4572            .unwrap();
4573
4574        let registered = rt
4575            .index_store_ref()
4576            .find_index_for_column("bench_bulk", "id")
4577            .expect("auto-index hook should fire on first bulk insert");
4578        assert_eq!(registered.name, "idx_id");
4579
4580        // Every row populated via `index_entity_insert_batch`.
4581        for id in 1..=3 {
4582            let result = rt
4583                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4584                .unwrap();
4585            assert_eq!(result.result.records.len(), 1);
4586        }
4587    }
4588
4589    /// Hook is a no-op when the row carries no `id` column. Conservative
4590    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4591    /// don't trigger it.
4592    #[test]
4593    fn auto_index_id_skips_when_no_id_column() {
4594        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4595        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4596            .unwrap();
4597        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4598            .unwrap();
4599
4600        assert!(rt
4601            .index_store_ref()
4602            .find_index_for_column("plain", "id")
4603            .is_none());
4604        assert!(rt
4605            .index_store_ref()
4606            .find_index_for_column("plain", "uid")
4607            .is_none());
4608    }
4609
4610    /// Hook only fires once per collection. If an explicit
4611    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4612    /// detects it via `find_index_for_column` and does NOT clobber it
4613    /// with a HASH index on the next insert.
4614    #[test]
4615    fn auto_index_id_skips_when_index_already_exists() {
4616        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4617        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4618            .unwrap();
4619        // User-declared BTREE index on `id` before any insert.
4620        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4621            .unwrap();
4622        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4623            .unwrap();
4624
4625        let registered = rt
4626            .index_store_ref()
4627            .find_index_for_column("pre", "id")
4628            .expect("user index should still be there");
4629        assert_eq!(
4630            registered.name, "user_idx",
4631            "auto-index hook must not overwrite an existing index"
4632        );
4633    }
4634
4635    /// Implicit `idx_id` is reaped when the collection drops. The
4636    /// existing `execute_drop_table` walks `list_indices` and drops every
4637    /// entry — confirm the auto-created index participates.
4638    #[test]
4639    fn auto_index_id_dropped_with_collection() {
4640        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4641        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4642            .unwrap();
4643        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4644            .unwrap();
4645        assert!(rt
4646            .index_store_ref()
4647            .find_index_for_column("ephemeral", "id")
4648            .is_some());
4649
4650        rt.execute_query("DROP TABLE ephemeral").unwrap();
4651
4652        assert!(
4653            rt.index_store_ref()
4654                .find_index_for_column("ephemeral", "id")
4655                .is_none(),
4656            "implicit `idx_id` must be reaped when its collection drops"
4657        );
4658    }
4659
4660    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4661    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4662    /// off, first insert leaves the collection without an `id` index —
4663    /// DELETE/UPDATE fall back to the scan path.
4664    #[test]
4665    fn auto_index_id_disabled_by_config() {
4666        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4667        let rt = RedDBRuntime::with_options(opts).unwrap();
4668
4669        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4670            .unwrap();
4671        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4672            .unwrap();
4673
4674        assert!(
4675            rt.index_store_ref()
4676                .find_index_for_column("off", "id")
4677                .is_none(),
4678            "with auto_index_id=false, no implicit index should be created"
4679        );
4680    }
4681
4682    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4683
4684    #[test]
4685    fn update_single_row_emits_update_event() {
4686        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4687        rt.execute_query(
4688            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4689        )
4690        .unwrap();
4691        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4692            .unwrap();
4693
4694        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4695            .unwrap();
4696
4697        let events = queue_payloads(&rt, "audit_log");
4698        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4699        let event = events[0].as_object().expect("event payload object");
4700        assert_eq!(
4701            event.get("op").and_then(crate::json::Value::as_str),
4702            Some("update")
4703        );
4704        assert_eq!(
4705            event.get("collection").and_then(crate::json::Value::as_str),
4706            Some("users")
4707        );
4708        assert!(event
4709            .get("event_id")
4710            .and_then(crate::json::Value::as_str)
4711            .is_some_and(|v| !v.is_empty()));
4712        let before = event
4713            .get("before")
4714            .and_then(crate::json::Value::as_object)
4715            .expect("before must be an object");
4716        let after = event
4717            .get("after")
4718            .and_then(crate::json::Value::as_object)
4719            .expect("after must be an object");
4720        assert_eq!(
4721            before.get("name").and_then(crate::json::Value::as_str),
4722            Some("Alice"),
4723            "before.name should be the old value"
4724        );
4725        assert_eq!(
4726            after.get("name").and_then(crate::json::Value::as_str),
4727            Some("Bob"),
4728            "after.name should be the new value"
4729        );
4730    }
4731
4732    #[test]
4733    fn update_event_only_includes_changed_fields() {
4734        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4735        rt.execute_query(
4736            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4737        )
4738        .unwrap();
4739        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4740            .unwrap();
4741
4742        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4743            .unwrap();
4744
4745        let events = queue_payloads(&rt, "evts");
4746        assert_eq!(events.len(), 1);
4747        let event = events[0].as_object().unwrap();
4748        let before = event
4749            .get("before")
4750            .and_then(crate::json::Value::as_object)
4751            .unwrap();
4752        let after = event
4753            .get("after")
4754            .and_then(crate::json::Value::as_object)
4755            .unwrap();
4756        // Only changed field included.
4757        assert!(
4758            before.contains_key("name"),
4759            "before must include changed field"
4760        );
4761        assert!(
4762            after.contains_key("name"),
4763            "after must include changed field"
4764        );
4765        // Unchanged fields must not appear.
4766        assert!(
4767            !before.contains_key("email"),
4768            "before must not include unchanged email"
4769        );
4770        assert!(
4771            !after.contains_key("email"),
4772            "after must not include unchanged email"
4773        );
4774    }
4775
4776    #[test]
4777    fn multi_row_update_emits_one_event_per_row() {
4778        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4779        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4780            .unwrap();
4781        rt.execute_query(
4782            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4783        )
4784        .unwrap();
4785
4786        rt.execute_query("UPDATE items SET status = 'done'")
4787            .unwrap();
4788
4789        let events = queue_payloads(&rt, "evts");
4790        assert_eq!(events.len(), 3, "expected one update event per row");
4791        for event in &events {
4792            let obj = event.as_object().unwrap();
4793            assert_eq!(
4794                obj.get("op").and_then(crate::json::Value::as_str),
4795                Some("update")
4796            );
4797        }
4798    }
4799
4800    #[test]
4801    fn delete_single_row_emits_delete_event() {
4802        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4803        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4804            .unwrap();
4805        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4806            .unwrap();
4807
4808        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4809
4810        let events = queue_payloads(&rt, "del_log");
4811        assert_eq!(events.len(), 1);
4812        let event = events[0].as_object().expect("event payload object");
4813        assert_eq!(
4814            event.get("op").and_then(crate::json::Value::as_str),
4815            Some("delete")
4816        );
4817        assert_eq!(
4818            event.get("collection").and_then(crate::json::Value::as_str),
4819            Some("users")
4820        );
4821        assert!(event
4822            .get("event_id")
4823            .and_then(crate::json::Value::as_str)
4824            .is_some_and(|v| !v.is_empty()));
4825        let before = event
4826            .get("before")
4827            .and_then(crate::json::Value::as_object)
4828            .expect("before must be an object for delete");
4829        assert_eq!(
4830            before.get("id").and_then(crate::json::Value::as_u64),
4831            Some(42)
4832        );
4833        assert_eq!(
4834            before.get("name").and_then(crate::json::Value::as_str),
4835            Some("Alice")
4836        );
4837        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4838    }
4839
4840    #[test]
4841    fn multi_row_delete_emits_one_event_per_row() {
4842        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4843        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4844            .unwrap();
4845        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4846            .unwrap();
4847
4848        rt.execute_query("DELETE FROM items").unwrap();
4849
4850        let events = queue_payloads(&rt, "del_log");
4851        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4852        for event in &events {
4853            let obj = event.as_object().unwrap();
4854            assert_eq!(
4855                obj.get("op").and_then(crate::json::Value::as_str),
4856                Some("delete")
4857            );
4858            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4859        }
4860    }
4861
4862    #[test]
4863    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4864        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4865        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4866            .unwrap();
4867
4868        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4869            .unwrap();
4870        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4871
4872        let events = queue_payloads(&rt, "evts");
4873        assert!(
4874            events.is_empty(),
4875            "UPDATE-only filter must not emit INSERT or DELETE events"
4876        );
4877    }
4878
4879    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
4880
4881    #[test]
4882    fn suppress_events_on_insert_emits_no_events() {
4883        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4884        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4885            .unwrap();
4886
4887        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4888            .unwrap();
4889
4890        let events = queue_payloads(&rt, "evts");
4891        assert!(
4892            events.is_empty(),
4893            "SUPPRESS EVENTS must prevent INSERT events"
4894        );
4895    }
4896
4897    #[test]
4898    fn suppress_events_on_update_emits_no_events() {
4899        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4900        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4901            .unwrap();
4902        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4903            .unwrap();
4904        // drain the INSERT event
4905        let _ = queue_payloads(&rt, "evts");
4906        // Force pop to drain; simpler: just check new count after UPDATE
4907        rt.execute_query("QUEUE PURGE evts").unwrap();
4908
4909        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4910            .unwrap();
4911
4912        let events = queue_payloads(&rt, "evts");
4913        assert!(
4914            events.is_empty(),
4915            "SUPPRESS EVENTS must prevent UPDATE events"
4916        );
4917    }
4918
4919    #[test]
4920    fn suppress_events_on_delete_emits_no_events() {
4921        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4922        rt.execute_query(
4923            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4924        )
4925        .unwrap();
4926        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4927            .unwrap();
4928
4929        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4930            .unwrap();
4931
4932        let events = queue_payloads(&rt, "evts");
4933        assert!(
4934            events.is_empty(),
4935            "SUPPRESS EVENTS must prevent DELETE events"
4936        );
4937    }
4938
4939    #[test]
4940    fn normal_insert_after_suppress_still_emits() {
4941        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4942        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4943            .unwrap();
4944
4945        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4946            .unwrap();
4947        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4948            .unwrap();
4949
4950        let events = queue_payloads(&rt, "evts");
4951        assert_eq!(
4952            events.len(),
4953            1,
4954            "only the non-suppressed INSERT should emit"
4955        );
4956        assert_eq!(
4957            events[0].get("id").and_then(crate::json::Value::as_u64),
4958            Some(2)
4959        );
4960    }
4961}