Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Issue #524 — public read of the in-memory chain tip. Returns `None`
67    /// when the collection is not a chain or has no rows (pre-genesis). On a
68    /// cold cache the first call falls back to a one-time scan so the HTTP
69    /// `GET /collections/:name/chain-tip` handler stays consistent with the
70    /// INSERT path after a restart.
71    pub fn chain_tip_for_collection(
72        &self,
73        collection: &str,
74    ) -> Option<crate::runtime::blockchain_kind::ChainTipFull> {
75        let store = self.inner.db.store();
76        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
77            return None;
78        }
79        let mut cache = self.inner.chain_tip_cache.lock();
80        if let Some(existing) = cache.get(collection) {
81            return Some(existing.clone());
82        }
83        let scanned = crate::runtime::blockchain_kind::chain_tip_full(&store, collection)?;
84        cache.insert(collection.to_string(), scanned.clone());
85        Some(scanned)
86    }
87
88    /// Issue #525 — walks the chain end-to-end, recomputes each block's hash
89    /// against the stored fields, and returns the verification outcome.  On
90    /// `ok == false` the integrity flag is persisted and the in-memory cache
91    /// is updated so subsequent INSERTs surface `ChainIntegrityBroken`.
92    ///
93    /// Returns `None` when the collection is absent or not a `KIND blockchain`.
94    pub fn verify_chain_for_collection(
95        &self,
96        collection: &str,
97    ) -> Option<crate::runtime::blockchain_kind::VerifyChainOutcome> {
98        let store = self.inner.db.store();
99        let outcome = crate::runtime::blockchain_kind::verify_chain_outcome(&store, collection)?;
100        if !outcome.ok {
101            crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, true);
102            self.inner
103                .chain_integrity_broken
104                .lock()
105                .insert(collection.to_string(), true);
106        }
107        Some(outcome)
108    }
109
110    /// Issue #525 — admin clears the `ChainIntegrityBroken` flag so the chain
111    /// accepts INSERTs again.  Returns `false` when the collection is not a
112    /// chain.
113    pub fn clear_chain_integrity_flag(&self, collection: &str) -> bool {
114        let store = self.inner.db.store();
115        if !crate::runtime::blockchain_kind::is_chain(&store, collection) {
116            return false;
117        }
118        crate::runtime::blockchain_kind::persist_integrity_flag(&store, collection, false);
119        self.inner
120            .chain_integrity_broken
121            .lock()
122            .insert(collection.to_string(), false);
123        true
124    }
125
126    /// Issue #525 — INSERT-time check.  Combines in-memory cache (fast path)
127    /// with a one-time scan of `red_config` on cold start so the flag survives
128    /// restart.
129    fn is_chain_integrity_broken(&self, collection: &str) -> bool {
130        {
131            let cache = self.inner.chain_integrity_broken.lock();
132            if let Some(v) = cache.get(collection) {
133                return *v;
134            }
135        }
136        let store = self.inner.db.store();
137        let persisted =
138            crate::runtime::blockchain_kind::is_integrity_broken_persisted(&store, collection)
139                .unwrap_or(false);
140        self.inner
141            .chain_integrity_broken
142            .lock()
143            .insert(collection.to_string(), persisted);
144        persisted
145    }
146
147    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
148    /// target table is tenant-scoped and the user's column list does
149    /// not already name the tenant column.
150    ///
151    /// Returns:
152    /// * `Ok(None)` — no injection needed (non-tenant table, or user
153    ///   supplied the column explicitly). Caller uses the original
154    ///   query unchanged.
155    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
156    ///   + literal value appended to every row.
157    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
158    ///   the current session. Fails loudly so callers don't produce
159    ///   rows that RLS would then hide on read.
160    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
161        let Some(tenant_col) = self.tenant_column(&query.table) else {
162            return Ok(None);
163        };
164        // User already named the column (literal match) — trust them.
165        if query
166            .columns
167            .iter()
168            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
169        {
170            return Ok(None);
171        }
172
173        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
174        // nested key like `headers.tenant` we operate on the root
175        // column (`headers`) and set / add the nested path inside its
176        // JSON value. If the user named the root column we mutate in
177        // place; otherwise we create a fresh JSON column for every row.
178        if let Some(dot_pos) = tenant_col.find('.') {
179            let (root, tail) = tenant_col.split_at(dot_pos);
180            let tail = &tail[1..]; // drop leading '.'
181            return self.inject_dotted_tenant(query, root, tail);
182        }
183
184        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
185            return Err(RedDBError::Query(format!(
186                "INSERT into tenant-scoped table '{}' requires an active tenant — \
187                 run SET TENANT '<id>' first or name column '{}' explicitly",
188                query.table, tenant_col
189            )));
190        };
191
192        let mut augmented = query.clone();
193        augmented.columns.push(tenant_col);
194        let lit = Value::text(tenant_id.clone());
195        for row in augmented.values.iter_mut() {
196            row.push(lit.clone());
197        }
198        for row in augmented.value_exprs.iter_mut() {
199            row.push(crate::storage::query::ast::Expr::Literal {
200                value: lit.clone(),
201                span: crate::storage::query::ast::Span::synthetic(),
202            });
203        }
204        Ok(Some(augmented))
205    }
206
207    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
208    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
209    /// nested JSON instead of appending a flat column.
210    ///
211    /// Cases:
212    /// * Root column already in the INSERT list → mutate per-row JSON
213    ///   (parse, set path, re-serialize).
214    /// * Root column absent → create a fresh `{tail: tenant}` JSON
215    ///   object and append the root column to the INSERT.
216    fn inject_dotted_tenant(
217        &self,
218        query: &InsertQuery,
219        root: &str,
220        tail: &str,
221    ) -> RedDBResult<Option<InsertQuery>> {
222        let active_tenant = crate::runtime::impl_core::current_tenant();
223        let mut augmented = query.clone();
224        let root_idx = augmented
225            .columns
226            .iter()
227            .position(|c| c.eq_ignore_ascii_case(root));
228
229        if let Some(idx) = root_idx {
230            // User supplied the root column. Per-row: if the dotted
231            // tail is already present we trust the user (admin / bulk
232            // loader scenario); otherwise fill from the active
233            // tenant. An unbound tenant is only an error when some
234            // row actually needs filling.
235            for row in augmented.values.iter_mut() {
236                let Some(slot) = row.get_mut(idx) else {
237                    continue;
238                };
239                if dotted_tail_already_set(slot, tail) {
240                    continue;
241                }
242                let Some(tenant_id) = &active_tenant else {
243                    return Err(RedDBError::Query(format!(
244                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
245                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
246                        query.table, root, tail
247                    )));
248                };
249                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
250            }
251            // Expression row is kept in sync by re-wrapping the
252            // mutated literal; the canonical path will re-evaluate
253            // against the same JSON shape.
254            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
255                if let Some(slot) = row.get_mut(idx) {
256                    let new_value = augmented
257                        .values
258                        .get(row_idx)
259                        .and_then(|v| v.get(idx))
260                        .cloned()
261                        .unwrap_or(Value::Null);
262                    *slot = crate::storage::query::ast::Expr::Literal {
263                        value: new_value,
264                        span: crate::storage::query::ast::Span::synthetic(),
265                    };
266                }
267            }
268        } else {
269            // No root column in the INSERT list — auto-fill needs a
270            // bound tenant to synthesise one. Error loud so we never
271            // create a tenant-less row that RLS would then hide.
272            let Some(tenant_id) = &active_tenant else {
273                return Err(RedDBError::Query(format!(
274                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
275                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
276                    query.table, root, tail
277                )));
278            };
279            // Create a fresh JSON column with only the tenant path set.
280            augmented.columns.push(root.to_string());
281            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
282            for row in augmented.values.iter_mut() {
283                row.push(fresh.clone());
284            }
285            for row in augmented.value_exprs.iter_mut() {
286                row.push(crate::storage::query::ast::Expr::Literal {
287                    value: fresh.clone(),
288                    span: crate::storage::query::ast::Span::synthetic(),
289                });
290            }
291        }
292
293        Ok(Some(augmented))
294    }
295
296    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
297    /// `lsns` is empty because events fire at commit time.
298    fn delete_entities_batch(
299        &self,
300        collection: &str,
301        ids: &[EntityId],
302    ) -> RedDBResult<(u64, Vec<u64>)> {
303        if ids.is_empty() {
304            return Ok((0, vec![]));
305        }
306
307        let store = self.db().store();
308        let Some(manager) = store.get_collection(collection) else {
309            return Ok((0, vec![]));
310        };
311
312        let active_xid = self.current_xid();
313        let conn_id = crate::runtime::impl_core::current_connection_id();
314        let mut autocommit_xid = None;
315        let mut tombstoned_ids = Vec::new();
316        let mut tombstoned_entities = Vec::new();
317        let mut physical_delete_ids = Vec::new();
318        let table_row_resolver =
319            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
320
321        for &id in ids {
322            let Some(mut entity) = manager.get(id) else {
323                continue;
324            };
325            if matches!(entity.data, EntityData::Row(_)) {
326                let previous_xmax = entity.xmax;
327                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
328                    if table_row_resolver.resolve_candidate(&entity).is_none() {
329                        continue;
330                    }
331                } else if entity.xmax != 0 {
332                    continue;
333                }
334
335                let xid = match active_xid {
336                    Some(xid) => xid,
337                    None => match autocommit_xid {
338                        Some(xid) => xid,
339                        None => {
340                            let mgr = self.snapshot_manager();
341                            let xid = mgr.begin();
342                            autocommit_xid = Some(xid);
343                            xid
344                        }
345                    },
346                };
347                entity.set_xmax(xid);
348                if manager.update(entity.clone()).is_ok() {
349                    if active_xid.is_some() {
350                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
351                    }
352                    tombstoned_entities.push(entity);
353                    tombstoned_ids.push(id);
354                }
355            } else {
356                physical_delete_ids.push(id);
357            }
358        }
359
360        if let Some(xid) = autocommit_xid {
361            self.snapshot_manager().commit(xid);
362        }
363
364        let mut affected = tombstoned_ids.len() as u64;
365        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
366        if active_xid.is_some() {
367            store
368                .persist_entities_to_pager(collection, &tombstoned_entities)
369                .map_err(|err| RedDBError::Internal(err.to_string()))?;
370        } else {
371            store
372                .persist_entities_to_pager(collection, &tombstoned_entities)
373                .map_err(|err| RedDBError::Internal(err.to_string()))?;
374            for id in &tombstoned_ids {
375                store.context_index().remove_entity(*id);
376                let lsn = self.cdc_emit(
377                    crate::replication::cdc::ChangeOperation::Delete,
378                    collection,
379                    id.raw(),
380                    "entity",
381                );
382                lsns.push(lsn);
383            }
384        }
385
386        let deleted_ids = store
387            .delete_batch(collection, &physical_delete_ids)
388            .map_err(|err| RedDBError::Internal(err.to_string()))?;
389        affected += deleted_ids.len() as u64;
390        for id in &deleted_ids {
391            store.context_index().remove_entity(*id);
392            let lsn = self.cdc_emit(
393                crate::replication::cdc::ChangeOperation::Delete,
394                collection,
395                id.raw(),
396                "entity",
397            );
398            lsns.push(lsn);
399        }
400
401        Ok((affected, lsns))
402    }
403
404    /// Flushes context-index updates and CDC for each applied mutation.
405    /// Returns one LSN per entity in the same order as `applied`.
406    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
407        if applied.is_empty() {
408            return Ok(Vec::new());
409        }
410
411        let store = self.db().store();
412        if applied.iter().any(|item| item.context_index_dirty) {
413            store.context_index().index_entities(
414                &applied[0].collection,
415                applied
416                    .iter()
417                    .filter(|item| item.context_index_dirty)
418                    .map(|item| &item.entity),
419            );
420        }
421
422        for item in applied {
423            self.refresh_update_secondary_indexes(item)?;
424        }
425
426        let mut lsns = Vec::with_capacity(applied.len());
427        for item in applied {
428            let lsn = self.cdc_emit_prebuilt(
429                crate::replication::cdc::ChangeOperation::Update,
430                &item.collection,
431                &item.entity,
432                update_cdc_item_kind(self, &item.collection, &item.entity),
433                item.metadata.as_ref(),
434                false,
435            );
436            lsns.push(lsn);
437        }
438        Ok(lsns)
439    }
440
441    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
442        self.persist_applied_entity_mutations(applied)
443    }
444
445    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
446        if applied.pre_mutation_fields.is_empty() {
447            return Ok(());
448        }
449        let post = entity_row_fields_snapshot(&applied.entity);
450        if post.is_empty() {
451            return Ok(());
452        }
453
454        let indexed_cols = self
455            .index_store_ref()
456            .indexed_columns_set(&applied.collection);
457        if indexed_cols.is_empty() {
458            return Ok(());
459        }
460
461        if let Some(old_version) = applied.replaced_entity.as_ref() {
462            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
463                .pre_mutation_fields
464                .iter()
465                .filter(|(col, _)| indexed_cols.contains(col))
466                .cloned()
467                .collect();
468            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
469                .iter()
470                .filter(|(col, _)| indexed_cols.contains(col))
471                .cloned()
472                .collect();
473            if !old_index_fields.is_empty() {
474                self.index_store_ref()
475                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
476                    .map_err(crate::RedDBError::Internal)?;
477            }
478            if !new_index_fields.is_empty() {
479                self.index_store_ref()
480                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
481                    .map_err(crate::RedDBError::Internal)?;
482            }
483            return Ok(());
484        }
485
486        let damage =
487            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
488        if damage
489            .touched_columns()
490            .into_iter()
491            .any(|col| indexed_cols.contains(col))
492        {
493            self.index_store_ref()
494                .index_entity_update(
495                    &applied.collection,
496                    applied.id,
497                    &applied.pre_mutation_fields,
498                    &post,
499                )
500                .map_err(crate::RedDBError::Internal)?;
501        }
502        Ok(())
503    }
504
505    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
506    ///
507    /// Each row in `query.values` is zipped with `query.columns` to produce a
508    /// set of named fields, which is then dispatched based on entity_type.
509    pub fn execute_insert(
510        &self,
511        raw_query: &str,
512        query: &InsertQuery,
513    ) -> RedDBResult<RuntimeQueryResult> {
514        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
515        // CollectionContract gate (#49): single entry point for the
516        // operator's collection-level write rules. Today this is a
517        // no-op for INSERT (APPEND ONLY permits insert); routing
518        // through the gate now means future contract bits — versioned,
519        // vault-only writes — plug in once instead of per verb.
520        crate::runtime::collection_contract::CollectionContractGate::check(
521            self,
522            &query.table,
523            crate::runtime::collection_contract::MutationKind::Insert,
524        )?;
525        // Phase 2.5.4 table-scoped tenancy: if the target table is
526        // tenant-scoped and the user didn't name the tenant column,
527        // auto-inject it with the thread-local `CURRENT_TENANT()`
528        // value. When the column is named explicitly we trust the
529        // caller (useful for admin tooling that writes on behalf of
530        // specific tenants). An unbound tenant on an implicit-fill
531        // path errors up front rather than producing a row the RLS
532        // policy would silently hide.
533        let augmented_owned;
534        let query = match self.maybe_inject_tenant_column(query)? {
535            Some(new_q) => {
536                augmented_owned = new_q;
537                &augmented_owned
538            }
539            None => query,
540        };
541        self.check_insert_column_policy(query)?;
542        if let Some(ref embed_config) = query.auto_embed {
543            let provider = crate::ai::parse_provider(&embed_config.provider)?;
544            if matches!(provider, crate::ai::AiProvider::Local) {
545                // Issue #682 — pre-flight the local model registry before
546                // any row write. Missing model, uninstalled artifacts,
547                // wrong task, and disabled-feature failures surface as
548                // deterministic errors that leave the target collection
549                // untouched, satisfying the "no partial writes on
550                // embedding failure" criterion for the failure modes
551                // owned by the local provider.
552                let model_name = embed_config.model.as_deref().map(str::trim).unwrap_or("");
553                if model_name.is_empty() {
554                    return Err(RedDBError::Query(
555                        "AUTO EMBED with provider=local requires MODEL '<registered-model-name>'; \
556                         the local provider does not have an implicit default model"
557                            .to_string(),
558                    ));
559                }
560                crate::runtime::ai::local_embedding::preflight_local_embedding(
561                    &self.inner.db,
562                    model_name,
563                )?;
564            }
565        }
566
567        let mut inserted_count: u64 = 0;
568        let effective_rows =
569            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
570
571        // Ensure the collection exists (auto-create on first insert).
572        let store = self.inner.db.store();
573        let _ = store.get_or_create_collection(&query.table);
574        let declared_model = self
575            .db()
576            .collection_contract_arc(&query.table)
577            .map(|contract| contract.declared_model);
578
579        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
580            if query.returning.is_some() {
581                Some(Vec::with_capacity(effective_rows.len()))
582            } else {
583                None
584            };
585        let mut returning_result: Option<UnifiedResult> = None;
586
587        if matches!(query.entity_type, InsertEntityType::Row)
588            && !matches!(
589                declared_model,
590                Some(crate::catalog::CollectionModel::TimeSeries)
591            )
592        {
593            // Issue #523 + #524: blockchain collections seal each row into the
594            // chain. When the caller omits the reserved columns, the engine
595            // auto-fills (#523). When the caller supplies any reserved column,
596            // the values are validated against the current tip and a mismatch
597            // surfaces a `BlockchainConflict:` error mapped to HTTP 409 (#524).
598            //
599            // The whole batch runs under a per-collection chain lock so two
600            // concurrent submitters can't both bind to the same prev_hash —
601            // the loser observes the advanced tip and gets 409 with the new
602            // tip so it can retry.
603            let chain_mode = crate::runtime::blockchain_kind::is_chain(&store, &query.table);
604            let _chain_lock_arc: Option<Arc<parking_lot::Mutex<()>>> = if chain_mode {
605                Some(self.inner.rmw_locks.lock_for(&query.table, "__chain__"))
606            } else {
607                None
608            };
609            let _chain_guard = _chain_lock_arc.as_ref().map(|m| m.lock());
610
611            // Issue #525 — refuse new blocks if the chain has been marked
612            // `integrity = broken` until an admin clears the flag.
613            if chain_mode && self.is_chain_integrity_broken(&query.table) {
614                return Err(RedDBError::InvalidOperation(format!(
615                    "ChainIntegrityBroken: collection '{}' is locked until \
616                     POST /collections/{}/clear-integrity-flag is called by an admin",
617                    query.table, query.table
618                )));
619            }
620
621            // Pull the tip from the in-memory cache; fall back to a one-time
622            // scan if the cache hasn't seen this collection yet (cold start
623            // after restart). Cache is updated below as rows are sealed.
624            let mut chain_tip_full: Option<crate::runtime::blockchain_kind::ChainTipFull> =
625                if chain_mode {
626                    let mut cache = self.inner.chain_tip_cache.lock();
627                    if let Some(existing) = cache.get(&query.table) {
628                        Some(existing.clone())
629                    } else if let Some(scanned) =
630                        crate::runtime::blockchain_kind::chain_tip_full(&store, &query.table)
631                    {
632                        cache.insert(query.table.clone(), scanned.clone());
633                        Some(scanned)
634                    } else {
635                        None
636                    }
637                } else {
638                    None
639                };
640
641            let mut rows = Vec::with_capacity(effective_rows.len());
642            for row_values in &effective_rows {
643                if row_values.len() != query.columns.len() {
644                    return Err(RedDBError::Query(format!(
645                        "INSERT column count ({}) does not match value count ({})",
646                        query.columns.len(),
647                        row_values.len()
648                    )));
649                }
650                let (mut fields, mut metadata) =
651                    split_insert_metadata(self, &query.columns, row_values)?;
652                if chain_mode {
653                    use crate::runtime::blockchain_kind::{
654                        chain_conflict_error, COL_BLOCK_HEIGHT, COL_HASH, COL_PREV_HASH,
655                        COL_TIMESTAMP, RESERVED_COLUMNS,
656                    };
657                    let supplied_height = fields
658                        .iter()
659                        .find(|(k, _)| k == COL_BLOCK_HEIGHT)
660                        .map(|(_, v)| v.clone());
661                    let supplied_prev = fields
662                        .iter()
663                        .find(|(k, _)| k == COL_PREV_HASH)
664                        .map(|(_, v)| v.clone());
665                    let supplied_ts = fields
666                        .iter()
667                        .find(|(k, _)| k == COL_TIMESTAMP)
668                        .map(|(_, v)| v.clone());
669                    let supplied_hash = fields.iter().any(|(k, _)| k == COL_HASH);
670                    let user_supplied_any = supplied_height.is_some()
671                        || supplied_prev.is_some()
672                        || supplied_ts.is_some()
673                        || supplied_hash;
674
675                    fields.retain(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()));
676                    let payload = crate::runtime::blockchain_kind::canonical_payload(&fields);
677
678                    let (tip_prev_hash, tip_next_height) = match &chain_tip_full {
679                        Some(t) => (t.hash, t.height + 1),
680                        None => (crate::storage::blockchain::GENESIS_PREV_HASH, 0u64),
681                    };
682                    let server_now = crate::runtime::blockchain_kind::now_ms();
683
684                    let (use_prev, use_height, use_ts) = if user_supplied_any {
685                        // Caller is participating in the chain protocol —
686                        // every field must be supplied AND match the tip.
687                        if supplied_hash {
688                            return Err(chain_conflict_error(
689                                tip_next_height.saturating_sub(1),
690                                tip_prev_hash,
691                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
692                                server_now,
693                                "hash column is engine-computed and cannot be supplied",
694                            ));
695                        }
696                        let caller_prev = match &supplied_prev {
697                            Some(Value::Blob(b)) if b.len() == 32 => {
698                                let mut a = [0u8; 32];
699                                a.copy_from_slice(b);
700                                a
701                            }
702                            Some(Value::Text(s)) if s.len() == 64 => {
703                                // Accept hex-encoded prev_hash so JSON / SQL
704                                // callers without literal-blob syntax can
705                                // still participate in the chain protocol.
706                                let mut a = [0u8; 32];
707                                let mut ok = true;
708                                for (i, slot) in a.iter_mut().enumerate() {
709                                    let pair = &s.as_ref()[i * 2..i * 2 + 2];
710                                    match u8::from_str_radix(pair, 16) {
711                                        Ok(byte) => *slot = byte,
712                                        Err(_) => {
713                                            ok = false;
714                                            break;
715                                        }
716                                    }
717                                }
718                                if !ok {
719                                    return Err(chain_conflict_error(
720                                        tip_next_height.saturating_sub(1),
721                                        tip_prev_hash,
722                                        chain_tip_full
723                                            .as_ref()
724                                            .map(|t| t.timestamp_ms)
725                                            .unwrap_or(0),
726                                        server_now,
727                                        "prev_hash is not valid hex",
728                                    ));
729                                }
730                                a
731                            }
732                            _ => {
733                                return Err(chain_conflict_error(
734                                    tip_next_height.saturating_sub(1),
735                                    tip_prev_hash,
736                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
737                                    server_now,
738                                    "prev_hash missing or not a 32-byte Blob",
739                                ));
740                            }
741                        };
742                        if caller_prev != tip_prev_hash {
743                            return Err(chain_conflict_error(
744                                tip_next_height.saturating_sub(1),
745                                tip_prev_hash,
746                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
747                                server_now,
748                                "prev_hash does not match current tip",
749                            ));
750                        }
751                        let caller_height = match &supplied_height {
752                            Some(Value::UnsignedInteger(v)) => *v,
753                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
754                            _ => {
755                                return Err(chain_conflict_error(
756                                    tip_next_height.saturating_sub(1),
757                                    tip_prev_hash,
758                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
759                                    server_now,
760                                    "block_height missing or not an unsigned integer",
761                                ));
762                            }
763                        };
764                        if caller_height != tip_next_height {
765                            return Err(chain_conflict_error(
766                                tip_next_height.saturating_sub(1),
767                                tip_prev_hash,
768                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
769                                server_now,
770                                "block_height does not match tip+1",
771                            ));
772                        }
773                        let caller_ts = match &supplied_ts {
774                            Some(Value::UnsignedInteger(v)) => *v,
775                            Some(Value::Integer(v)) if *v >= 0 => *v as u64,
776                            _ => {
777                                return Err(chain_conflict_error(
778                                    tip_next_height.saturating_sub(1),
779                                    tip_prev_hash,
780                                    chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
781                                    server_now,
782                                    "timestamp missing or not an unsigned integer",
783                                ));
784                            }
785                        };
786                        let drift = (caller_ts as i128) - (server_now as i128);
787                        if drift.abs() > 60_000 {
788                            return Err(chain_conflict_error(
789                                tip_next_height.saturating_sub(1),
790                                tip_prev_hash,
791                                chain_tip_full.as_ref().map(|t| t.timestamp_ms).unwrap_or(0),
792                                server_now,
793                                "timestamp outside ±60s of server_time",
794                            ));
795                        }
796                        (caller_prev, caller_height, caller_ts)
797                    } else {
798                        (tip_prev_hash, tip_next_height, server_now)
799                    };
800
801                    let (reserved, new_hash) =
802                        crate::runtime::blockchain_kind::make_block_reserved_fields(
803                            use_prev, use_height, use_ts, &payload,
804                        );
805                    fields.extend(reserved);
806                    chain_tip_full = Some(crate::runtime::blockchain_kind::ChainTipFull {
807                        height: use_height,
808                        hash: new_hash,
809                        timestamp_ms: use_ts,
810                    });
811                }
812                // Issue #522 — signed-writes verification. On collections
813                // created with `SIGNED_BY (...)` the row must carry valid
814                // `signer_pubkey` + `signature` reserved columns. Runs
815                // after chain_mode so canonical payload covers user-supplied
816                // fields only (blockchain reserved columns are filtered by
817                // `canonical_payload`; the two signed-writes reserved
818                // columns are split out before payload computation, then
819                // re-attached for storage). The blockchain + SIGNED_BY
820                // composition is owned by issue #526; we keep #522 to the
821                // non-chain path and let chain_mode collections punt to that
822                // slice rather than half-wire it here.
823                if crate::runtime::signed_writes_kind::is_signed(&store, &query.table) {
824                    let (pk_col, sig_col, residual) =
825                        crate::runtime::signed_writes_kind::split_signature_fields(fields);
826                    let payload = crate::runtime::blockchain_kind::canonical_payload(&residual);
827                    let reg = crate::runtime::signed_writes_kind::registry(&store, &query.table);
828                    crate::runtime::signed_writes_kind::verify_row(
829                        &reg,
830                        pk_col.as_ref().map(|c| c.bytes.as_slice()),
831                        sig_col.as_ref().map(|c| c.bytes.as_slice()),
832                        &payload,
833                    )
834                    .map_err(crate::runtime::signed_writes_kind::map_error)?;
835                    fields = residual;
836                    // Round-trip the reserved columns with the value
837                    // type the caller supplied (Text/hex on the SQL path,
838                    // Blob on the binary path). Keeps SELECT and WHERE
839                    // predicates symmetric with the INSERT shape.
840                    if let Some(col) = pk_col {
841                        fields.push((
842                            crate::storage::signed_writes::RESERVED_SIGNER_PUBKEY_COL.to_string(),
843                            col.raw_value,
844                        ));
845                    }
846                    if let Some(col) = sig_col {
847                        fields.push((
848                            crate::storage::signed_writes::RESERVED_SIGNATURE_COL.to_string(),
849                            col.raw_value,
850                        ));
851                    }
852                }
853                merge_with_clauses(
854                    &mut metadata,
855                    query.ttl_ms,
856                    query.expires_at_ms,
857                    &query.with_metadata,
858                );
859                if let Some(snaps) = returning_snapshots.as_mut() {
860                    snaps.push(fields.clone());
861                }
862                rows.push(CreateRowInput {
863                    collection: query.table.clone(),
864                    fields,
865                    metadata,
866                    node_links: Vec::new(),
867                    vector_links: Vec::new(),
868                });
869            }
870            let outputs = self.create_rows_batch(CreateRowsBatchInput {
871                collection: query.table.clone(),
872                rows,
873                suppress_events: query.suppress_events,
874            })?;
875            inserted_count = outputs.len() as u64;
876
877            // Chain mode: commit the new tip to the in-memory cache only after
878            // the batch persisted successfully. If the batch threw mid-way the
879            // cache stays on the previous tip and the chain lock releases.
880            if chain_mode {
881                if let Some(new_tip) = chain_tip_full.as_ref() {
882                    self.inner
883                        .chain_tip_cache
884                        .lock()
885                        .insert(query.table.clone(), new_tip.clone());
886                }
887            }
888
889            // Hypertable chunk routing: if this table was declared via
890            // CREATE HYPERTABLE, register each row's time-column value
891            // with the registry so chunk metadata (bounds, row counts,
892            // TTL eligibility) stays current. This is what lets
893            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
894            // retention daemon sweep expired chunks without scanning
895            // every row.
896            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
897                let time_col = &spec.time_column;
898                // Find the column's index in the INSERT column list.
899                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
900                    for row in &effective_rows {
901                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
902                            if *n >= 0 {
903                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
904                            }
905                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
906                            let _ = self.inner.db.hypertables().route(&query.table, *n);
907                        }
908                    }
909                }
910            }
911
912            if let (Some(items), Some(snaps)) =
913                (query.returning.as_ref(), returning_snapshots.take())
914            {
915                let snaps = row_insert_returning_snapshots(&outputs, snaps);
916                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
917            }
918        } else {
919            // Issue #419: surface the inserted entity id on every INSERT path.
920            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
921            // so a RETURNING clause (and the unconditional inserted_ids list,
922            // below) can expose the engine-assigned id. TimeSeries (the row
923            // branch in this else) still returns the not-supported error
924            // because create_timeseries_point isn't plumbed through this fn.
925            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
926                Vec::with_capacity(effective_rows.len());
927            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
928            {
929                Vec::with_capacity(effective_rows.len())
930            } else {
931                Vec::new()
932            };
933            if matches!(
934                query.entity_type,
935                InsertEntityType::Node | InsertEntityType::Edge
936            ) {
937                enum PreparedGraphInsert {
938                    Node {
939                        fields: Vec<(String, Value)>,
940                        input: CreateNodeInput,
941                    },
942                    Edge {
943                        fields: Vec<(String, Value)>,
944                        input: CreateEdgeInput,
945                    },
946                }
947
948                let mut prepared = Vec::with_capacity(effective_rows.len());
949                for row_values in &effective_rows {
950                    if row_values.len() != query.columns.len() {
951                        return Err(RedDBError::Query(format!(
952                            "INSERT column count ({}) does not match value count ({})",
953                            query.columns.len(),
954                            row_values.len()
955                        )));
956                    }
957
958                    match query.entity_type {
959                        InsertEntityType::Node => {
960                            let (node_values, mut metadata) =
961                                split_insert_metadata(self, &query.columns, row_values)?;
962                            merge_with_clauses(
963                                &mut metadata,
964                                query.ttl_ms,
965                                query.expires_at_ms,
966                                &query.with_metadata,
967                            );
968                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
969                            apply_collection_default_ttl_metadata(
970                                self,
971                                &query.table,
972                                &mut metadata,
973                            );
974                            let (columns, values) = pairwise_columns_values(&node_values);
975                            let label = find_column_value_string(&columns, &values, "label")?;
976                            let node_type =
977                                find_column_value_opt_string(&columns, &values, "node_type");
978                            let properties = extract_remaining_properties(
979                                &columns,
980                                &values,
981                                &["label", "node_type"],
982                            );
983                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
984                                properties.iter().map(|(key, _)| key.as_str()),
985                                &format!("node '{}'", query.table),
986                            )?;
987                            prepared.push(PreparedGraphInsert::Node {
988                                fields: node_values,
989                                input: CreateNodeInput {
990                                    collection: query.table.clone(),
991                                    label,
992                                    node_type,
993                                    properties,
994                                    metadata,
995                                    embeddings: Vec::new(),
996                                    table_links: Vec::new(),
997                                    node_links: Vec::new(),
998                                },
999                            });
1000                        }
1001                        InsertEntityType::Edge => {
1002                            let (edge_values, mut metadata) =
1003                                split_insert_metadata(self, &query.columns, row_values)?;
1004                            merge_with_clauses(
1005                                &mut metadata,
1006                                query.ttl_ms,
1007                                query.expires_at_ms,
1008                                &query.with_metadata,
1009                            );
1010                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
1011                            apply_collection_default_ttl_metadata(
1012                                self,
1013                                &query.table,
1014                                &mut metadata,
1015                            );
1016                            let (columns, values) = pairwise_columns_values(&edge_values);
1017                            let label = find_column_value_string(&columns, &values, "label")?;
1018                            ensure_non_tree_structural_edge_label(&label)?;
1019                            let from_id = resolve_edge_endpoint_any(
1020                                self.inner.db.store().as_ref(),
1021                                &query.table,
1022                                &columns,
1023                                &values,
1024                                &["from_rid", "from"],
1025                            )?;
1026                            let to_id = resolve_edge_endpoint_any(
1027                                self.inner.db.store().as_ref(),
1028                                &query.table,
1029                                &columns,
1030                                &values,
1031                                &["to_rid", "to"],
1032                            )?;
1033                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
1034                            let properties = extract_remaining_properties(
1035                                &columns,
1036                                &values,
1037                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
1038                            );
1039                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
1040                                properties.iter().map(|(key, _)| key.as_str()),
1041                                &format!("edge '{}'", query.table),
1042                            )?;
1043                            prepared.push(PreparedGraphInsert::Edge {
1044                                fields: edge_values,
1045                                input: CreateEdgeInput {
1046                                    collection: query.table.clone(),
1047                                    label,
1048                                    from: EntityId::new(from_id),
1049                                    to: EntityId::new(to_id),
1050                                    weight,
1051                                    properties,
1052                                    metadata,
1053                                },
1054                            });
1055                        }
1056                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1057                    }
1058                }
1059
1060                ensure_graph_insert_contract(self, &query.table)?;
1061                let mut batch = self.inner.db.batch();
1062                for item in prepared {
1063                    match item {
1064                        PreparedGraphInsert::Node { fields, input } => {
1065                            if query.returning.is_some() {
1066                                returning_field_snaps.push(fields);
1067                            }
1068                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
1069                            batch = batch.add_node_with_type(
1070                                input.collection,
1071                                input.label,
1072                                node_type,
1073                                input.properties.into_iter().collect(),
1074                                input.metadata.into_iter().collect(),
1075                            );
1076                        }
1077                        PreparedGraphInsert::Edge { fields, input } => {
1078                            if query.returning.is_some() {
1079                                returning_field_snaps.push(fields);
1080                            }
1081                            batch = batch.add_edge(
1082                                input.collection,
1083                                input.label,
1084                                input.from,
1085                                input.to,
1086                                input.weight.unwrap_or(1.0),
1087                                input.properties.into_iter().collect(),
1088                                input.metadata.into_iter().collect(),
1089                            );
1090                        }
1091                    }
1092                }
1093                let batch_result = batch
1094                    .execute()
1095                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
1096                let (ids, entity_kind) = match query.entity_type {
1097                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
1098                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
1099                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
1100                };
1101                for id in &ids {
1102                    self.stamp_xmin_if_in_txn(&query.table, *id);
1103                }
1104                if query.returning.is_some() {
1105                    returning_field_snaps = graph_insert_returning_snapshots(
1106                        self.inner.db.store().as_ref(),
1107                        &query.table,
1108                        &ids,
1109                    );
1110                }
1111                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
1112                let store = self.inner.db.store();
1113                entity_outputs.extend(ids.iter().map(|id| {
1114                    crate::application::entity::CreateEntityOutput {
1115                        id: *id,
1116                        entity: store.get(&query.table, *id),
1117                    }
1118                }));
1119                inserted_count = ids.len() as u64;
1120            } else {
1121                for row_values in &effective_rows {
1122                    if row_values.len() != query.columns.len() {
1123                        return Err(RedDBError::Query(format!(
1124                            "INSERT column count ({}) does not match value count ({})",
1125                            query.columns.len(),
1126                            row_values.len()
1127                        )));
1128                    }
1129
1130                    match query.entity_type {
1131                        InsertEntityType::Row => {
1132                            if query.returning.is_some() {
1133                                return Err(RedDBError::Query(
1134                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
1135                                    .to_string(),
1136                            ));
1137                            }
1138                            let (fields, mut metadata) =
1139                                split_insert_metadata(self, &query.columns, row_values)?;
1140                            merge_with_clauses(
1141                                &mut metadata,
1142                                query.ttl_ms,
1143                                query.expires_at_ms,
1144                                &query.with_metadata,
1145                            );
1146                            self.insert_timeseries_point(&query.table, fields, metadata)?;
1147                        }
1148                        InsertEntityType::Node | InsertEntityType::Edge => {
1149                            unreachable!("NODE and EDGE are handled by the prepared graph path")
1150                        }
1151                        InsertEntityType::Vector => {
1152                            let (vector_values, mut metadata) =
1153                                split_insert_metadata(self, &query.columns, row_values)?;
1154                            merge_with_clauses(
1155                                &mut metadata,
1156                                query.ttl_ms,
1157                                query.expires_at_ms,
1158                                &query.with_metadata,
1159                            );
1160                            let (columns, values) = pairwise_columns_values(&vector_values);
1161                            let dense = find_column_value_vec_f32_any(
1162                                &columns,
1163                                &values,
1164                                &["dense", "embedding"],
1165                            )?;
1166                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
1167                            let content =
1168                                find_column_value_opt_string(&columns, &values, "content");
1169                            if query.returning.is_some() {
1170                                returning_field_snaps.push(vector_values.clone());
1171                            }
1172                            let input = CreateVectorInput {
1173                                collection: query.table.clone(),
1174                                dense,
1175                                content,
1176                                metadata,
1177                                link_row: None,
1178                                link_node: None,
1179                            };
1180                            entity_outputs.push(self.create_vector(input)?);
1181                        }
1182                        InsertEntityType::Document => {
1183                            let (document_values, mut metadata) =
1184                                split_insert_metadata(self, &query.columns, row_values)?;
1185                            merge_with_clauses(
1186                                &mut metadata,
1187                                query.ttl_ms,
1188                                query.expires_at_ms,
1189                                &query.with_metadata,
1190                            );
1191                            let (columns, values) = pairwise_columns_values(&document_values);
1192                            let body_str = find_column_value_string(&columns, &values, "body")?;
1193                            let body: crate::json::Value = crate::json::from_str(&body_str)
1194                                .map_err(|e| {
1195                                    RedDBError::Query(format!("invalid JSON body: {e}"))
1196                                })?;
1197                            let input = CreateDocumentInput {
1198                                collection: query.table.clone(),
1199                                body,
1200                                metadata,
1201                                node_links: Vec::new(),
1202                                vector_links: Vec::new(),
1203                            };
1204                            let output = self.create_document(input)?;
1205                            if query.returning.is_some() {
1206                                let fields = output
1207                                    .entity
1208                                    .as_ref()
1209                                    .map(entity_row_fields_snapshot)
1210                                    .filter(|fields| !fields.is_empty())
1211                                    .unwrap_or(document_values);
1212                                returning_field_snaps.push(fields);
1213                            }
1214                            entity_outputs.push(output);
1215                        }
1216                        InsertEntityType::Kv => {
1217                            let (kv_values, mut metadata) =
1218                                split_insert_metadata(self, &query.columns, row_values)?;
1219                            merge_with_clauses(
1220                                &mut metadata,
1221                                query.ttl_ms,
1222                                query.expires_at_ms,
1223                                &query.with_metadata,
1224                            );
1225                            let (columns, values) = pairwise_columns_values(&kv_values);
1226                            let key = find_column_value_string(&columns, &values, "key")?;
1227                            let value = find_column_value(&columns, &values, "value")?;
1228                            if query.returning.is_some() {
1229                                returning_field_snaps.push(kv_values.clone());
1230                            }
1231                            let input = CreateKvInput {
1232                                collection: query.table.clone(),
1233                                key,
1234                                value,
1235                                metadata,
1236                            };
1237                            entity_outputs.push(self.create_kv(input)?);
1238                        }
1239                    }
1240
1241                    inserted_count += 1;
1242                }
1243            }
1244
1245            if let Some(items) = query.returning.as_ref() {
1246                if !entity_outputs.is_empty() {
1247                    returning_result = Some(build_returning_result(
1248                        items,
1249                        &returning_field_snaps,
1250                        Some(&entity_outputs),
1251                    ));
1252                }
1253            }
1254        }
1255
1256        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
1257        if let Some(ref embed_config) = query.auto_embed {
1258            let store = self.inner.db.store();
1259            let provider = crate::ai::parse_provider(&embed_config.provider)?;
1260            let is_local_provider = matches!(provider, crate::ai::AiProvider::Local);
1261            // Local provider runs in-process — no API key path applies.
1262            // The pre-flight above already required `MODEL '<name>'`
1263            // for the local case, so the unwrap_or default below only
1264            // ever fires for OpenAI-compatible providers.
1265            let api_key = if is_local_provider {
1266                String::new()
1267            } else {
1268                crate::ai::resolve_api_key_from_runtime(&provider, None, self)?
1269            };
1270            let model = embed_config.model.clone().unwrap_or_else(|| {
1271                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
1272                    .ok()
1273                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
1274            });
1275
1276            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
1277            let manager = store
1278                .get_collection(&query.table)
1279                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1280            let entities = manager.query_all(|_| true);
1281            let recent: Vec<_> = entities
1282                .into_iter()
1283                .rev()
1284                .take(effective_rows.len())
1285                .collect();
1286
1287            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
1288            let entity_combos: Vec<(usize, String)> = recent
1289                .iter()
1290                .enumerate()
1291                .filter_map(|(i, entity)| {
1292                    if let EntityData::Row(ref row) = entity.data {
1293                        if let Some(ref named) = row.named {
1294                            let texts: Vec<String> = embed_config
1295                                .fields
1296                                .iter()
1297                                .filter_map(|field| match named.get(field) {
1298                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
1299                                    _ => None,
1300                                })
1301                                .collect();
1302                            if !texts.is_empty() {
1303                                return Some((i, texts.join(" ")));
1304                            }
1305                        }
1306                    }
1307                    None
1308                })
1309                .collect();
1310
1311            if !entity_combos.is_empty() {
1312                // Batch phase: single provider round-trip for all rows.
1313                let batch_texts: Vec<String> =
1314                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
1315
1316                // Issue #682 — when the provider is `local`, bypass
1317                // AiBatchClient (which is HTTP-only) and dispatch
1318                // directly through the in-process local embedding
1319                // backend. All texts go in one call, mirroring the
1320                // single-round-trip shape of the remote path. The
1321                // local backend does not perform intra-batch dedup —
1322                // each input position gets its own row in the output
1323                // — which keeps the per-row "create_vector" loop
1324                // below correct without additional fan-out logic.
1325                let embeddings = if is_local_provider {
1326                    let response = crate::runtime::ai::local_embedding::embed_local_with_db(
1327                        &self.inner.db,
1328                        &model,
1329                        batch_texts,
1330                    )?;
1331                    response.embeddings
1332                } else {
1333                    let batch_client =
1334                        crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
1335
1336                    match tokio::runtime::Handle::try_current() {
1337                        Ok(handle) => tokio::task::block_in_place(|| {
1338                            handle.block_on(batch_client.embed_batch(
1339                                &provider,
1340                                &model,
1341                                &api_key,
1342                                batch_texts,
1343                            ))
1344                        }),
1345                        Err(_) => {
1346                            return Err(RedDBError::Query(
1347                                "AUTO EMBED requires a Tokio runtime context".to_string(),
1348                            ));
1349                        }
1350                    }
1351                    .map_err(|e| RedDBError::Query(e.to_string()))?
1352                };
1353
1354                // Distribute phase: persist one vector per non-empty embedding.
1355                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
1356                    if dense.is_empty() {
1357                        continue;
1358                    }
1359                    self.create_vector(CreateVectorInput {
1360                        collection: query.table.clone(),
1361                        dense,
1362                        content: Some(combined.clone()),
1363                        metadata: Vec::new(),
1364                        link_row: None,
1365                        link_node: None,
1366                    })?;
1367                }
1368            }
1369        }
1370
1371        if inserted_count > 0 {
1372            self.note_table_write(&query.table);
1373        }
1374
1375        let mut result = RuntimeQueryResult::dml_result(
1376            raw_query.to_string(),
1377            inserted_count,
1378            "insert",
1379            "runtime-dml",
1380        );
1381        if let Some(returning) = returning_result {
1382            result.result = returning;
1383        }
1384        Ok(result)
1385    }
1386
1387    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
1388        let Some(auth_store) = self.inner.auth_store.read().clone() else {
1389            return Ok(());
1390        };
1391        if !auth_store.iam_authorization_enabled() {
1392            return Ok(());
1393        }
1394        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1395            return Ok(());
1396        };
1397
1398        let tenant = crate::runtime::impl_core::current_tenant();
1399        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1400        let request = crate::auth::ColumnAccessRequest {
1401            action: "insert".to_string(),
1402            schema: None,
1403            table: query.table.clone(),
1404            columns: query.columns.clone(),
1405        };
1406        let ctx = crate::auth::policies::EvalContext {
1407            principal_tenant: tenant.clone(),
1408            current_tenant: tenant,
1409            peer_ip: None,
1410            mfa_present: false,
1411            now_ms: crate::auth::now_ms(),
1412            principal_is_admin_role: role == crate::auth::Role::Admin,
1413            principal_is_system_owned: auth_store.principal_is_system_owned(&principal),
1414            principal_is_platform_scoped: principal.tenant.is_none(),
1415        };
1416
1417        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1418        let table_allowed = matches!(
1419            outcome.table_decision,
1420            crate::auth::policies::Decision::Allow { .. }
1421                | crate::auth::policies::Decision::AdminBypass
1422        );
1423        if !table_allowed {
1424            return Err(RedDBError::Query(format!(
1425                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1426                outcome.table_resource.kind, outcome.table_resource.name
1427            )));
1428        }
1429        if let Some(denied) = outcome.first_denied_column() {
1430            return Err(RedDBError::Query(format!(
1431                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1432                denied.resource.kind, denied.resource.name
1433            )));
1434        }
1435
1436        Ok(())
1437    }
1438
1439    pub(crate) fn insert_timeseries_point(
1440        &self,
1441        collection: &str,
1442        fields: Vec<(String, Value)>,
1443        mut metadata: Vec<(String, MetadataValue)>,
1444    ) -> RedDBResult<EntityId> {
1445        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1446
1447        let (columns, values) = pairwise_columns_values(&fields);
1448        validate_timeseries_insert_columns(&columns)?;
1449
1450        // Issue #577 — AnalyticsSchemaRegistry hook. If the row carries
1451        // an `event_name` whose schema is registered, validate the
1452        // `payload` JSON against it BEFORE any write side-effect. On
1453        // failure we return a typed error and the row is not
1454        // persisted. When no schema is registered for the event name
1455        // (or no `event_name` column is supplied at all) we fall
1456        // through to the normal write path for back-compat with
1457        // existing timeseries rows.
1458        let event_name_opt = find_column_value_opt_string(&columns, &values, "event_name");
1459        let payload_opt = find_column_value_opt_string(&columns, &values, "payload");
1460        if let Some(event_name) = event_name_opt.as_deref() {
1461            let store_for_schema = self.inner.db.store();
1462            if super::analytics_schema_registry::latest(store_for_schema.as_ref(), event_name)
1463                .is_some()
1464            {
1465                let payload_json = payload_opt.as_deref().unwrap_or("{}");
1466                super::analytics_schema_registry::validate(
1467                    store_for_schema.as_ref(),
1468                    event_name,
1469                    payload_json,
1470                )
1471                .map_err(super::analytics_schema_registry::validation_error_to_reddb)?;
1472            }
1473        }
1474
1475        // `metric` is required by the existing timeseries write path;
1476        // when an analytics-style row supplies `event_name` but not
1477        // `metric`, fall back to the event name so the storage path
1478        // still has a non-empty metric tag.
1479        let metric = match find_column_value_opt_string(&columns, &values, "metric") {
1480            Some(m) => m,
1481            None => event_name_opt.clone().ok_or_else(|| {
1482                RedDBError::Query(
1483                    "timeseries INSERT requires either `metric` or `event_name`".to_string(),
1484                )
1485            })?,
1486        };
1487        // `value` is optional for analytics-event rows (which are
1488        // semantically counts of 1); default to 1.0 when missing so
1489        // analytics inserts don't have to fabricate a metric value.
1490        let value = match find_column_value_opt_string(&columns, &values, "value") {
1491            Some(s) => s.parse::<f64>().unwrap_or(1.0),
1492            None => columns
1493                .iter()
1494                .position(|c| c.eq_ignore_ascii_case("value"))
1495                .and_then(|i| match &values[i] {
1496                    Value::Float(f) => Some(*f),
1497                    Value::Integer(n) | Value::BigInt(n) => Some(*n as f64),
1498                    Value::UnsignedInteger(n) => Some(*n as f64),
1499                    _ => None,
1500                })
1501                .unwrap_or(1.0),
1502        };
1503        let timestamp_ns =
1504            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1505        let mut tags = find_timeseries_tags(&columns, &values)?;
1506        if let Some(ref name) = event_name_opt {
1507            tags.entry("event_name".to_string())
1508                .or_insert_with(|| name.clone());
1509        }
1510        if let Some(ref payload) = payload_opt {
1511            tags.entry("payload".to_string())
1512                .or_insert_with(|| payload.clone());
1513        }
1514
1515        let mut entity = UnifiedEntity::new(
1516            EntityId::new(0),
1517            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1518                series: collection.to_string(),
1519                metric: metric.clone(),
1520            })),
1521            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1522                metric,
1523                timestamp_ns,
1524                value,
1525                tags,
1526            }),
1527        );
1528        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1529        // or an autocommit xid (allocated and committed up-front so
1530        // future snapshots see the row as soon as it lands).
1531        let writer_xid = match self.current_xid() {
1532            Some(xid) => xid,
1533            None => {
1534                let mgr = self.snapshot_manager();
1535                let xid = mgr.begin();
1536                mgr.commit(xid);
1537                xid
1538            }
1539        };
1540        entity.set_xmin(writer_xid);
1541
1542        let store = self.inner.db.store();
1543        let id = store
1544            .insert_auto(collection, entity)
1545            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1546
1547        if !metadata.is_empty() {
1548            let _ = store.set_metadata(
1549                collection,
1550                id,
1551                Metadata::with_fields(metadata.into_iter().collect()),
1552            );
1553        }
1554
1555        self.cdc_emit(
1556            crate::replication::cdc::ChangeOperation::Insert,
1557            collection,
1558            id.raw(),
1559            "timeseries",
1560        );
1561
1562        Ok(id)
1563    }
1564
1565    /// Execute UPDATE table SET col=val, ... WHERE filter
1566    ///
1567    /// Scans the target collection, evaluates the WHERE filter against each
1568    /// record, and patches every matching entity.
1569    pub fn execute_update(
1570        &self,
1571        raw_query: &str,
1572        query: &UpdateQuery,
1573    ) -> RedDBResult<RuntimeQueryResult> {
1574        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1575        // Issue #523 — blockchain collections are immutable. Reject before
1576        // RLS / RETURNING work so the operator sees a clean 409-mapped
1577        // error instead of a partially-applied mutation surface.
1578        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
1579            return Err(RedDBError::InvalidOperation(format!(
1580                "BlockchainCollectionImmutable: UPDATE not allowed on '{}'",
1581                query.table
1582            )));
1583        }
1584        // CollectionContract gate (#50): runs the APPEND ONLY guard
1585        // (and any future contract bits) before RLS / RETURNING work
1586        // so the operator's immutability declaration is honoured
1587        // uniformly and the error message points at the DDL rather
1588        // than at a downstream symptom.
1589        crate::runtime::collection_contract::CollectionContractGate::check(
1590            self,
1591            &query.table,
1592            crate::runtime::collection_contract::MutationKind::Update,
1593        )?;
1594        ensure_update_target_contract(self, &query.table, query.target)?;
1595        ensure_graph_identity_update_target_allowed(query)?;
1596
1597        // Apply RLS augmentation first so every downstream path — plain
1598        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1599        // same policy-filtered target set. This prevents RETURNING
1600        // from ever exposing rows the UPDATE policy would have
1601        // denied.
1602        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1603        let augmented_query: UpdateQuery;
1604        let effective_query: &UpdateQuery = if rls_gated {
1605            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1606                self,
1607                &query.table,
1608                crate::storage::query::ast::PolicyAction::Update,
1609            );
1610            let Some(policy) = rls_filter else {
1611                // No admitting policy: zero rows affected, empty
1612                // RETURNING (never leak rows the caller can't touch).
1613                let mut response = RuntimeQueryResult::dml_result(
1614                    raw_query.to_string(),
1615                    0,
1616                    "update",
1617                    "runtime-dml-rls",
1618                );
1619                if let Some(items) = query.returning.clone() {
1620                    response.result = build_returning_result(&items, &[], None);
1621                }
1622                return Ok(response);
1623            };
1624            let mut augmented = query.clone();
1625            augmented.filter = Some(match augmented.filter.take() {
1626                Some(existing) => {
1627                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1628                }
1629                None => policy,
1630            });
1631            augmented_query = augmented;
1632            &augmented_query
1633        } else {
1634            query
1635        };
1636
1637        // RETURNING wraps the inner executor and uses the touched-id
1638        // list the inner reports so the post-image reflects exactly
1639        // the rows the UPDATE actually mutated (not whatever a
1640        // separate SELECT might have observed).
1641        if let Some(items) = effective_query.returning.clone() {
1642            let mut inner_query = effective_query.clone();
1643            inner_query.returning = None;
1644            let (mut response, touched_ids) =
1645                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1646
1647            let snapshots = if matches!(
1648                effective_query.target,
1649                UpdateTarget::Nodes | UpdateTarget::Edges
1650            ) {
1651                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1652            } else {
1653                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1654                    .row_snapshots(&touched_ids)
1655            };
1656
1657            response.result = build_returning_result(&items, &snapshots, None);
1658            response.engine = "runtime-dml-returning";
1659            return Ok(response);
1660        }
1661
1662        self.execute_update_inner(raw_query, effective_query)
1663    }
1664
1665    /// Back-compat shim: the older entry point ignored touched ids.
1666    fn execute_update_inner(
1667        &self,
1668        raw_query: &str,
1669        query: &UpdateQuery,
1670    ) -> RedDBResult<RuntimeQueryResult> {
1671        self.execute_update_inner_tracked(raw_query, query)
1672            .map(|(res, _)| res)
1673    }
1674
1675    fn execute_update_inner_tracked(
1676        &self,
1677        raw_query: &str,
1678        query: &UpdateQuery,
1679    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1680        let store = self.inner.db.store();
1681        let effective_filter = effective_update_filter(query);
1682        let compiled_plan = self.compile_update_plan(query)?;
1683        let mut touched_ids: Vec<EntityId> = Vec::new();
1684        let limit_cap = query.limit.map(|l| l as usize);
1685        let manager = store
1686            .get_collection(&query.table)
1687            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1688        let scan_limit = if query.order_by.is_empty() {
1689            limit_cap
1690        } else {
1691            None
1692        };
1693        let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1694            self,
1695            &query.table,
1696            effective_filter.as_ref(),
1697            scan_limit,
1698            query.target,
1699        )
1700        .find_target_ids()?;
1701        let ids_to_update = if query.order_by.is_empty() {
1702            ids_to_update
1703        } else {
1704            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1705        };
1706
1707        if update_needs_rmw_lock(query) {
1708            return self.execute_update_inner_tracked_locked(
1709                raw_query,
1710                query,
1711                &compiled_plan,
1712                &ids_to_update,
1713                effective_filter.as_ref(),
1714            );
1715        }
1716
1717        let mut affected: u64 = 0;
1718        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1719            let mut applied_chunk = Vec::with_capacity(chunk.len());
1720            for entity in manager.get_many(chunk).into_iter().flatten() {
1721                let assignments =
1722                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1723                let applied = self.apply_materialized_update_for_entity(
1724                    query.table.clone(),
1725                    entity,
1726                    &compiled_plan,
1727                    assignments,
1728                )?;
1729                touched_ids.push(applied.id);
1730                applied_chunk.push(applied);
1731            }
1732            self.persist_update_chunk(&applied_chunk)?;
1733            affected += applied_chunk.len() as u64;
1734            let lsns = self.flush_update_chunk(&applied_chunk)?;
1735            if !query.suppress_events {
1736                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1737            }
1738        }
1739
1740        if affected > 0 {
1741            self.note_table_write(&query.table);
1742        }
1743
1744        Ok((
1745            RuntimeQueryResult::dml_result(
1746                raw_query.to_string(),
1747                affected,
1748                "update",
1749                "runtime-dml",
1750            ),
1751            touched_ids,
1752        ))
1753    }
1754
1755    fn execute_update_inner_tracked_locked(
1756        &self,
1757        raw_query: &str,
1758        query: &UpdateQuery,
1759        compiled_plan: &CompiledUpdatePlan,
1760        ids_to_update: &[EntityId],
1761        effective_filter: Option<&Filter>,
1762    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1763        let store = self.inner.db.store();
1764        let mut touched_ids = Vec::new();
1765        let mut lock_entries = Vec::new();
1766
1767        for id in ids_to_update {
1768            let Some(candidate) = store.get(&query.table, *id) else {
1769                continue;
1770            };
1771            let logical_id = candidate.logical_id();
1772            let lock_key = format!("row:{}", logical_id.raw());
1773            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1774            lock_entries.push((lock_key, logical_id, rmw_lock));
1775        }
1776
1777        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1778        lock_entries.dedup_by(|left, right| left.0 == right.0);
1779        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1780
1781        let mut applied_chunk = Vec::new();
1782        for (_, logical_id, _) in &lock_entries {
1783            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1784            else {
1785                continue;
1786            };
1787            if let Some(filter) = effective_filter {
1788                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1789                    Some(self.inner.db.as_ref()),
1790                    &entity,
1791                    filter,
1792                    &query.table,
1793                    &query.table,
1794                ) {
1795                    continue;
1796                }
1797            }
1798
1799            let assignments =
1800                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1801            let applied = self.apply_materialized_update_for_entity(
1802                query.table.clone(),
1803                entity,
1804                compiled_plan,
1805                assignments,
1806            )?;
1807            touched_ids.push(applied.id);
1808            applied_chunk.push(applied);
1809        }
1810
1811        let affected = applied_chunk.len() as u64;
1812        if !applied_chunk.is_empty() {
1813            self.persist_update_chunk(&applied_chunk)?;
1814            let lsns = self.flush_update_chunk(&applied_chunk)?;
1815            if !query.suppress_events {
1816                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1817            }
1818        }
1819
1820        if affected > 0 {
1821            self.note_table_write(&query.table);
1822        }
1823
1824        Ok((
1825            RuntimeQueryResult::dml_result(
1826                raw_query.to_string(),
1827                affected,
1828                "update",
1829                "runtime-dml",
1830            ),
1831            touched_ids,
1832        ))
1833    }
1834
1835    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1836        let mut static_field_assignments = Vec::new();
1837        let mut static_metadata_assignments = Vec::new();
1838        let mut dynamic_assignments = Vec::new();
1839        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1840        let mut row_modified_columns = Vec::new();
1841
1842        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1843            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1844            let metadata_key = resolve_sql_ttl_metadata_key(column);
1845            if compound_op.is_some() && metadata_key.is_some() {
1846                return Err(RedDBError::Query(format!(
1847                    "compound assignment is only supported for row fields: {column}"
1848                )));
1849            }
1850            if compound_op.is_none() {
1851                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1852                    if let Some(metadata_key) = metadata_key {
1853                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1854                        let (canonical_key, canonical_value) =
1855                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1856                        static_metadata_assignments
1857                            .push((canonical_key.to_string(), canonical_value));
1858                    } else {
1859                        let value = self.resolve_crypto_sentinel(value)?;
1860                        static_field_assignments.push((
1861                            column.clone(),
1862                            normalize_row_update_assignment_with_plan(
1863                                &query.table,
1864                                column,
1865                                value,
1866                                row_contract_plan.as_ref(),
1867                            )?,
1868                        ));
1869                        row_modified_columns.push(column.clone());
1870                    }
1871                    continue;
1872                }
1873            }
1874
1875            dynamic_assignments.push(CompiledUpdateAssignment {
1876                column: column.clone(),
1877                expr: expr.clone(),
1878                compound_op,
1879                metadata_key,
1880                row_rule: if metadata_key.is_none() {
1881                    if let Some(plan) = row_contract_plan.as_ref() {
1882                        if plan.timestamps_enabled
1883                            && (column == "created_at" || column == "updated_at")
1884                        {
1885                            return Err(RedDBError::Query(format!(
1886                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1887                                query.table, column
1888                            )));
1889                        }
1890                        if let Some(rule) = plan.declared_rules.get(column) {
1891                            Some(rule.clone())
1892                        } else if plan.strict_schema {
1893                            return Err(RedDBError::Query(format!(
1894                                "collection '{}' is strict and does not allow undeclared fields: {}",
1895                                query.table, column
1896                            )));
1897                        } else {
1898                            None
1899                        }
1900                    } else {
1901                        None
1902                    }
1903                } else {
1904                    None
1905                },
1906            });
1907            if metadata_key.is_none() {
1908                row_modified_columns.push(column.clone());
1909            }
1910        }
1911
1912        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1913        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1914            row_modified_columns.iter().any(|column| {
1915                plan.unique_columns
1916                    .keys()
1917                    .any(|unique| unique.eq_ignore_ascii_case(column))
1918            })
1919        });
1920
1921        if let Some(ttl_ms) = query.ttl_ms {
1922            static_metadata_assignments
1923                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1924        }
1925        if let Some(expires_at_ms) = query.expires_at_ms {
1926            static_metadata_assignments.push((
1927                "_expires_at".to_string(),
1928                metadata_u64_to_value(expires_at_ms),
1929            ));
1930        }
1931        for (key, val) in &query.with_metadata {
1932            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1933        }
1934
1935        Ok(CompiledUpdatePlan {
1936            static_field_assignments,
1937            static_metadata_assignments,
1938            dynamic_assignments,
1939            row_contract_plan,
1940            row_modified_columns,
1941            row_touches_unique_columns,
1942        })
1943    }
1944
1945    fn materialize_update_assignments_for_entity(
1946        &self,
1947        query: &UpdateQuery,
1948        entity: &UnifiedEntity,
1949        compiled_plan: &CompiledUpdatePlan,
1950    ) -> RedDBResult<MaterializedUpdateAssignments> {
1951        let mut assignments = MaterializedUpdateAssignments::default();
1952        let mut record: Option<UnifiedRecord> = None;
1953
1954        for assignment in &compiled_plan.dynamic_assignments {
1955            if assignment.compound_op.is_some()
1956                && !matches!(
1957                    entity.data,
1958                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1959                )
1960            {
1961                return Err(RedDBError::Query(format!(
1962                    "compound assignment is only supported for row or graph UPDATE column '{}'",
1963                    assignment.column
1964                )));
1965            }
1966            if record.is_none() {
1967                record = runtime_any_record_from_entity_ref(entity);
1968            }
1969            let Some(record) = record.as_ref() else {
1970                return Err(RedDBError::Query(format!(
1971                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1972                    entity.id.raw(),
1973                    query.table
1974                )));
1975            };
1976            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1977                Some(self.inner.db.as_ref()),
1978                &assignment.expr,
1979                record,
1980                Some(query.table.as_str()),
1981                Some(query.table.as_str()),
1982            )
1983            .ok_or_else(|| {
1984                RedDBError::Query(format!(
1985                    "failed to evaluate UPDATE expression for column '{}'",
1986                    assignment.column
1987                ))
1988            })?;
1989            let value = if let Some(op) = assignment.compound_op {
1990                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1991            } else {
1992                rhs
1993            };
1994
1995            if let Some(metadata_key) = assignment.metadata_key {
1996                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1997                let (canonical_key, canonical_value) =
1998                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1999                assignments
2000                    .dynamic_metadata_assignments
2001                    .push((canonical_key.to_string(), canonical_value));
2002            } else {
2003                assignments.dynamic_field_assignments.push((
2004                    assignment.column.clone(),
2005                    normalize_row_update_value_for_rule(
2006                        &query.table,
2007                        self.resolve_crypto_sentinel(value)?,
2008                        assignment.row_rule.as_ref(),
2009                    )?,
2010                ));
2011            }
2012        }
2013
2014        Ok(assignments)
2015    }
2016
2017    fn apply_materialized_update_for_entity(
2018        &self,
2019        collection: String,
2020        entity: UnifiedEntity,
2021        compiled_plan: &CompiledUpdatePlan,
2022        assignments: MaterializedUpdateAssignments,
2023    ) -> RedDBResult<AppliedEntityMutation> {
2024        if matches!(entity.data, EntityData::Row(_)) {
2025            return self.apply_loaded_sql_update_row_core(
2026                collection,
2027                entity,
2028                &compiled_plan.static_field_assignments,
2029                assignments.dynamic_field_assignments,
2030                &compiled_plan.static_metadata_assignments,
2031                assignments.dynamic_metadata_assignments,
2032                compiled_plan.row_contract_plan.as_ref(),
2033                &compiled_plan.row_modified_columns,
2034                compiled_plan.row_touches_unique_columns,
2035            );
2036        }
2037
2038        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
2039
2040        let operations = build_patch_operations_from_materialized_assignments(
2041            &entity,
2042            compiled_plan,
2043            assignments,
2044        );
2045        self.apply_loaded_patch_entity_core(
2046            collection,
2047            entity,
2048            crate::json::Value::Null,
2049            operations,
2050        )
2051    }
2052
2053    /// Execute DELETE FROM table WHERE filter
2054    pub fn execute_delete(
2055        &self,
2056        raw_query: &str,
2057        query: &DeleteQuery,
2058    ) -> RedDBResult<RuntimeQueryResult> {
2059        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2060        // Issue #523 — blockchain collections are immutable; see
2061        // execute_update for the same gate.
2062        if crate::runtime::blockchain_kind::is_chain(self.inner.db.store().as_ref(), &query.table) {
2063            return Err(RedDBError::InvalidOperation(format!(
2064                "BlockchainCollectionImmutable: DELETE not allowed on '{}'",
2065                query.table
2066            )));
2067        }
2068        // CollectionContract gate (#50) — see execute_update for
2069        // rationale. The gate handles APPEND ONLY rejection and is
2070        // the single point where future contract bits land.
2071        crate::runtime::collection_contract::CollectionContractGate::check(
2072            self,
2073            &query.table,
2074            crate::runtime::collection_contract::MutationKind::Delete,
2075        )?;
2076
2077        // RETURNING on DELETE: capture the pre-image via an internal
2078        // SELECT that reuses the same WHERE, then run the delete with
2079        // the RETURNING clause stripped, then project the captured
2080        // rows through the requested items. The extra SELECT is a
2081        // pragmatic MVP — a future pass can fuse the scan with the
2082        // delete to avoid the second pass over the heap.
2083        if let Some(items) = query.returning.clone() {
2084            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
2085                RedDBError::Query(
2086                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
2087                )
2088            })?;
2089            let captured = self.execute_query(&select_sql)?;
2090
2091            let mut inner_query = query.clone();
2092            inner_query.returning = None;
2093            let _ = self.execute_delete(raw_query, &inner_query)?;
2094
2095            let snapshots: Vec<Vec<(String, Value)>> = captured
2096                .result
2097                .records
2098                .iter()
2099                .map(|rec| {
2100                    rec.iter_fields()
2101                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
2102                        .collect()
2103                })
2104                .collect();
2105            let affected = snapshots.len() as u64;
2106            let result = build_returning_result(&items, &snapshots, None);
2107
2108            let mut response = RuntimeQueryResult::dml_result(
2109                raw_query.to_string(),
2110                affected,
2111                "delete",
2112                "runtime-dml-returning",
2113            );
2114            response.result = result;
2115            return Ok(response);
2116        }
2117        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
2118        //
2119        // When the table has RLS enabled, gate the DELETE by the
2120        // per-role policy set: mutations only touch rows that *every*
2121        // matching `FOR DELETE` policy would accept. No policies =>
2122        // zero rows affected (PG restrictive-default).
2123        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
2124            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
2125                self,
2126                &query.table,
2127                crate::storage::query::ast::PolicyAction::Delete,
2128            );
2129            let Some(policy) = rls_filter else {
2130                return Ok(RuntimeQueryResult::dml_result(
2131                    raw_query.to_string(),
2132                    0,
2133                    "delete",
2134                    "runtime-dml-rls",
2135                ));
2136            };
2137            // Fold the policy predicate into the user's WHERE before
2138            // dispatching — the remainder of this function reads the
2139            // filter from `query` via `effective_delete_filter`, which
2140            // respects the updated value.
2141            let mut augmented = query.clone();
2142            augmented.filter = Some(match augmented.filter.take() {
2143                Some(existing) => {
2144                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
2145                }
2146                None => policy,
2147            });
2148            return self.execute_delete_inner(raw_query, &augmented);
2149        }
2150        self.execute_delete_inner(raw_query, query)
2151    }
2152
2153    fn execute_delete_inner(
2154        &self,
2155        raw_query: &str,
2156        query: &DeleteQuery,
2157    ) -> RedDBResult<RuntimeQueryResult> {
2158        let effective_filter = effective_delete_filter(query);
2159
2160        // Find the rows that match the WHERE clause. The "find target
2161        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
2162        // the same scan strategy.
2163        let scan = super::dml_target_scan::DmlTargetScan::new(
2164            self,
2165            &query.table,
2166            effective_filter.as_ref(),
2167            None,
2168        );
2169        let ids_to_delete = scan.find_target_ids()?;
2170
2171        // For event-enabled collections, snapshot the pre-delete state
2172        // before rows are physically removed.
2173        let needs_delete_events =
2174            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
2175        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
2176            scan.row_json_pre_images(&ids_to_delete)
2177        } else {
2178            HashMap::new()
2179        };
2180
2181        let mut affected: u64 = 0;
2182        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
2183            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
2184            affected += count;
2185            if needs_delete_events && !lsns.is_empty() {
2186                // lsns.len() == actually-deleted entities; align with chunk ids.
2187                // `delete_batch` may skip missing entities, so we correlate by
2188                // the number returned (they're emitted in chunk order).
2189                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
2190                self.emit_delete_events_for_collection(
2191                    &query.table,
2192                    deleted_chunk,
2193                    &lsns,
2194                    &pre_images,
2195                )?;
2196            }
2197        }
2198        pre_images.clear();
2199
2200        if affected > 0 {
2201            self.note_table_write(&query.table);
2202        }
2203
2204        Ok(RuntimeQueryResult::dml_result(
2205            raw_query.to_string(),
2206            affected,
2207            "delete",
2208            "runtime-dml",
2209        ))
2210    }
2211}
2212
2213/// Reject UPDATE … NODES/EDGES that assign to graph identity/topology
2214/// columns regardless of whether any row matches the WHERE clause. The
2215/// per-entity guard below covers only the matched-rows case, but ADR 0019
2216/// declares these columns immutable on the surface itself, so a zero-row
2217/// UPDATE should still surface the same error to operators and SDKs.
2218fn ensure_graph_identity_update_target_allowed(query: &UpdateQuery) -> RedDBResult<()> {
2219    if !matches!(query.target, UpdateTarget::Nodes | UpdateTarget::Edges) {
2220        return Ok(());
2221    }
2222    for (column, _) in &query.assignment_exprs {
2223        if is_immutable_graph_identity_field(column) {
2224            return Err(RedDBError::Query(format!(
2225                "immutable graph field '{column}' cannot be updated"
2226            )));
2227        }
2228    }
2229    Ok(())
2230}
2231
2232fn ensure_graph_identity_update_allowed(
2233    entity: &UnifiedEntity,
2234    compiled_plan: &CompiledUpdatePlan,
2235    assignments: &MaterializedUpdateAssignments,
2236) -> RedDBResult<()> {
2237    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
2238        return Ok(());
2239    }
2240
2241    for (column, _) in compiled_plan
2242        .static_field_assignments
2243        .iter()
2244        .chain(assignments.dynamic_field_assignments.iter())
2245    {
2246        if is_immutable_graph_identity_field(column) {
2247            return Err(RedDBError::Query(format!(
2248                "immutable graph field '{column}' cannot be updated"
2249            )));
2250        }
2251    }
2252
2253    Ok(())
2254}
2255
2256fn is_immutable_graph_identity_field(column: &str) -> bool {
2257    ["rid", "label", "from_rid", "to_rid", "from", "to"]
2258        .iter()
2259        .any(|reserved| column.eq_ignore_ascii_case(reserved))
2260}
2261
2262fn build_patch_operations_from_materialized_assignments(
2263    entity: &UnifiedEntity,
2264    compiled_plan: &CompiledUpdatePlan,
2265    assignments: MaterializedUpdateAssignments,
2266) -> Vec<PatchEntityOperation> {
2267    let mut operations = Vec::with_capacity(
2268        compiled_plan.static_field_assignments.len()
2269            + compiled_plan.static_metadata_assignments.len()
2270            + assignments.dynamic_field_assignments.len()
2271            + assignments.dynamic_metadata_assignments.len(),
2272    );
2273
2274    for (column, value) in &compiled_plan.static_field_assignments {
2275        operations.push(PatchEntityOperation {
2276            op: PatchEntityOperationType::Set,
2277            path: update_patch_path_for_entity(entity, column),
2278            value: Some(storage_value_to_json(value)),
2279        });
2280    }
2281
2282    for (column, value) in assignments.dynamic_field_assignments {
2283        operations.push(PatchEntityOperation {
2284            op: PatchEntityOperationType::Set,
2285            path: update_patch_path_for_entity(entity, &column),
2286            value: Some(storage_value_to_json(&value)),
2287        });
2288    }
2289
2290    for (key, value) in &compiled_plan.static_metadata_assignments {
2291        operations.push(PatchEntityOperation {
2292            op: PatchEntityOperationType::Set,
2293            path: vec!["metadata".to_string(), key.clone()],
2294            value: Some(metadata_value_to_json(value)),
2295        });
2296    }
2297
2298    for (key, value) in assignments.dynamic_metadata_assignments {
2299        operations.push(PatchEntityOperation {
2300            op: PatchEntityOperationType::Set,
2301            path: vec!["metadata".to_string(), key],
2302            value: Some(metadata_value_to_json(&value)),
2303        });
2304    }
2305
2306    operations
2307}
2308
2309fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
2310    if matches!(
2311        (&entity.kind, &entity.data),
2312        (
2313            crate::storage::EntityKind::GraphNode(_),
2314            EntityData::Node(_)
2315        )
2316    ) && column.eq_ignore_ascii_case("node_type")
2317    {
2318        return vec!["node_type".to_string()];
2319    }
2320    if matches!(
2321        (&entity.kind, &entity.data),
2322        (
2323            crate::storage::EntityKind::GraphEdge(_),
2324            EntityData::Edge(_)
2325        )
2326    ) && column.eq_ignore_ascii_case("weight")
2327    {
2328        return vec!["weight".to_string()];
2329    }
2330    vec!["fields".to_string(), column.to_string()]
2331}
2332
2333/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
2334/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
2335/// capture the pre-image before actually removing the rows. Returns
2336/// `None` when the input does not start with `DELETE`.
2337///
2338/// Case-insensitive on the keywords. Preserves everything between
2339/// the table name and the RETURNING clause, so WHERE / ORDER BY /
2340/// LIMIT survive untouched. The RETURNING tail — if present — is
2341/// truncated at the first top-level `RETURNING` token.
2342fn delete_to_select_sql(sql: &str) -> Option<String> {
2343    let trimmed = sql.trim_start();
2344    let lowered = trimmed.to_ascii_lowercase();
2345    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
2346        return None;
2347    }
2348    // Find `FROM` after DELETE.
2349    let from_idx = lowered.find(" from ")?;
2350    let after_from = &trimmed[from_idx + " from ".len()..];
2351    let after_from_lc = &lowered[from_idx + " from ".len()..];
2352
2353    // Cut off the RETURNING tail (a naive search — the RETURNING
2354    // clause only appears once per statement at top level in our
2355    // grammar). Matches whitespace-bounded tokens to avoid clipping
2356    // `RETURNING` inside a string literal.
2357    let mut body = after_from.to_string();
2358    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
2359        body.truncate(pos);
2360    }
2361    Some(format!("SELECT * FROM {}", body.trim_end()))
2362}
2363
2364/// Find the byte offset of a whitespace-bounded keyword in a
2365/// lowercased haystack, skipping matches inside single-quoted
2366/// string literals. Naive — no escape handling — but enough for
2367/// the shapes the DML parser emits.
2368fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
2369    let bytes = haystack.as_bytes();
2370    let nlen = needle.len();
2371    let mut i = 0usize;
2372    let mut in_string = false;
2373    while i < bytes.len() {
2374        let c = bytes[i];
2375        if c == b'\'' {
2376            in_string = !in_string;
2377            i += 1;
2378            continue;
2379        }
2380        if !in_string
2381            && i + nlen <= bytes.len()
2382            && &bytes[i..i + nlen] == needle.as_bytes()
2383            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
2384            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
2385        {
2386            return Some(i);
2387        }
2388        i += 1;
2389    }
2390    None
2391}
2392
2393/// Build a `UnifiedResult` from the rows affected by a DML statement plus
2394/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
2395/// for one affected row; `outputs`, when provided, supplies the engine-
2396/// assigned entity id for the same row (INSERT path). Projection honours
2397/// the RETURNING items: `*` expands to every snapshot column plus
2398/// the public row envelope when available.
2399fn build_returning_result(
2400    items: &[ReturningItem],
2401    snapshots: &[Vec<(String, Value)>],
2402    outputs: Option<&[CreateEntityOutput]>,
2403) -> UnifiedResult {
2404    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
2405    let public_item_outputs = outputs.is_some_and(|outs| {
2406        outs.first()
2407            .and_then(|out| out.entity.as_ref())
2408            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
2409    });
2410
2411    let mut columns: Vec<String> = if project_all {
2412        let mut cols: Vec<String> = Vec::new();
2413        if public_item_outputs {
2414            cols.extend(
2415                [
2416                    "rid",
2417                    "collection",
2418                    "kind",
2419                    "tenant",
2420                    "created_at",
2421                    "updated_at",
2422                ]
2423                .into_iter()
2424                .map(str::to_string),
2425            );
2426        } else if outputs.is_some() {
2427            cols.push("red_entity_id".to_string());
2428        }
2429        if let Some(first) = snapshots.first() {
2430            for (name, _) in first {
2431                cols.push(name.clone());
2432            }
2433        }
2434        cols
2435    } else {
2436        items
2437            .iter()
2438            .filter_map(|it| match it {
2439                ReturningItem::Column(c) => Some(c.clone()),
2440                ReturningItem::All => None,
2441            })
2442            .collect()
2443    };
2444    // Guarantee unique order-preserving column list.
2445    {
2446        let mut seen = std::collections::HashSet::new();
2447        columns.retain(|c| seen.insert(c.clone()));
2448    }
2449
2450    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
2451    for (idx, snap) in snapshots.iter().enumerate() {
2452        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
2453        if let Some(outs) = outputs {
2454            if let Some(out) = outs.get(idx) {
2455                if let Some(entity) = out.entity.as_ref() {
2456                    if let Some(kind) = public_returning_item_kind(entity) {
2457                        values.insert(
2458                            Arc::clone(&sys_key_rid()),
2459                            Value::UnsignedInteger(out.id.raw()),
2460                        );
2461                        values.insert(
2462                            Arc::clone(&sys_key_collection()),
2463                            Value::text(entity.kind.collection().to_string()),
2464                        );
2465                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
2466                        values.insert(
2467                            Arc::clone(&sys_key_created_at()),
2468                            Value::UnsignedInteger(entity.created_at),
2469                        );
2470                        values.insert(
2471                            Arc::clone(&sys_key_updated_at()),
2472                            Value::UnsignedInteger(entity.updated_at),
2473                        );
2474                        // Legacy alias: an explicit `RETURNING red_entity_id`
2475                        // still resolves to the row's rid. Only surfaces when
2476                        // the projected column list names it — `RETURNING *`
2477                        // keeps the envelope clean (rid, not red_entity_id).
2478                        values.insert(
2479                            Arc::clone(&sys_key_red_entity_id()),
2480                            Value::UnsignedInteger(out.id.raw()),
2481                        );
2482                    } else {
2483                        values.insert(
2484                            Arc::clone(&sys_key_red_entity_id()),
2485                            Value::Integer(out.id.raw() as i64),
2486                        );
2487                    }
2488                } else {
2489                    values.insert(
2490                        Arc::clone(&sys_key_red_entity_id()),
2491                        Value::Integer(out.id.raw() as i64),
2492                    );
2493                }
2494            }
2495        }
2496        for (name, val) in snap {
2497            values.insert(Arc::from(name.as_str()), val.clone());
2498        }
2499        if !values.contains_key("tenant") {
2500            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2501            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2502        }
2503        let mut rec = UnifiedRecord::default();
2504        // Only keep projected columns on the record.
2505        for col in &columns {
2506            if let Some(v) = values.get(col.as_str()) {
2507                rec.set_arc(Arc::from(col.as_str()), v.clone());
2508            }
2509        }
2510        records.push(rec);
2511    }
2512
2513    UnifiedResult {
2514        columns,
2515        records,
2516        stats: Default::default(),
2517        pre_serialized_json: None,
2518    }
2519}
2520
2521fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2522    match (&entity.kind, &entity.data) {
2523        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2524            Some("node")
2525        }
2526        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2527            Some("edge")
2528        }
2529        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2530        _ => None,
2531    }
2532}
2533
2534fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2535    let Some(row) = entity.data.as_row() else {
2536        return "row";
2537    };
2538
2539    let is_kv = row.named.as_ref().is_some_and(|named| {
2540        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2541            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2542    });
2543    if is_kv {
2544        return "kv";
2545    }
2546
2547    let is_document = row
2548        .named
2549        .as_ref()
2550        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2551        || row.columns.iter().any(runtime_returning_documentish_value);
2552    if is_document {
2553        "document"
2554    } else {
2555        "row"
2556    }
2557}
2558
2559fn runtime_returning_documentish_value(value: &Value) -> bool {
2560    matches!(value, Value::Json(_) | Value::Blob(_))
2561}
2562
2563fn row_insert_returning_snapshots(
2564    outputs: &[CreateEntityOutput],
2565    fallback: Vec<Vec<(String, Value)>>,
2566) -> Vec<Vec<(String, Value)>> {
2567    outputs
2568        .iter()
2569        .enumerate()
2570        .map(|(idx, out)| {
2571            out.entity
2572                .as_ref()
2573                .map(entity_row_fields_snapshot)
2574                .filter(|snap| !snap.is_empty())
2575                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2576        })
2577        .collect()
2578}
2579
2580fn graph_insert_returning_snapshots(
2581    store: &crate::storage::unified::UnifiedStore,
2582    collection: &str,
2583    ids: &[EntityId],
2584) -> Vec<Vec<(String, Value)>> {
2585    let Some(manager) = store.get_collection(collection) else {
2586        return Vec::new();
2587    };
2588
2589    ids.iter()
2590        .filter_map(|id| manager.get(*id))
2591        .filter_map(|entity| {
2592            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2593            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2594            Some(record)
2595        })
2596        .map(|record| {
2597            record
2598                .iter_fields()
2599                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2600                .collect()
2601        })
2602        .collect()
2603}
2604
2605fn graph_update_returning_snapshots(
2606    runtime: &RedDBRuntime,
2607    collection: &str,
2608    ids: &[EntityId],
2609) -> Vec<Vec<(String, Value)>> {
2610    let store = runtime.db().store();
2611    let Some(manager) = store.get_collection(collection) else {
2612        return Vec::new();
2613    };
2614
2615    manager
2616        .get_many(ids)
2617        .into_iter()
2618        .flatten()
2619        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2620        .map(|record| {
2621            record
2622                .iter_fields()
2623                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2624                .collect()
2625        })
2626        .collect()
2627}
2628
2629fn ensure_update_target_contract(
2630    runtime: &RedDBRuntime,
2631    collection: &str,
2632    target: UpdateTarget,
2633) -> RedDBResult<()> {
2634    let Some(contract) = runtime.db().collection_contract(collection) else {
2635        return Ok(());
2636    };
2637    if update_target_contract_is_advisory(&contract)
2638        || update_target_allows_model(contract.declared_model, update_target_model(target))
2639    {
2640        return Ok(());
2641    }
2642    Err(RedDBError::InvalidOperation(format!(
2643        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2644        collection,
2645        update_model_name(contract.declared_model),
2646        update_model_name(update_target_model(target))
2647    )))
2648}
2649
2650fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2651    matches!(
2652        (&contract.origin, &contract.schema_mode),
2653        (
2654            crate::physical::ContractOrigin::Implicit,
2655            crate::catalog::SchemaMode::Dynamic,
2656        )
2657    )
2658}
2659
2660fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2661    match target {
2662        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2663        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2664        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2665        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2666    }
2667}
2668
2669fn update_target_allows_model(
2670    declared_model: crate::catalog::CollectionModel,
2671    requested_model: crate::catalog::CollectionModel,
2672) -> bool {
2673    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2674}
2675
2676fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2677    match model {
2678        crate::catalog::CollectionModel::Table => "table",
2679        crate::catalog::CollectionModel::Document => "document",
2680        crate::catalog::CollectionModel::Graph => "graph",
2681        crate::catalog::CollectionModel::Vector => "vector",
2682        crate::catalog::CollectionModel::Hll => "hll",
2683        crate::catalog::CollectionModel::Sketch => "sketch",
2684        crate::catalog::CollectionModel::Filter => "filter",
2685        crate::catalog::CollectionModel::Kv => "kv",
2686        crate::catalog::CollectionModel::Config => "config",
2687        crate::catalog::CollectionModel::Vault => "vault",
2688        crate::catalog::CollectionModel::Mixed => "mixed",
2689        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2690        crate::catalog::CollectionModel::Queue => "queue",
2691        crate::catalog::CollectionModel::Metrics => "metrics",
2692    }
2693}
2694
2695fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2696    let db = runtime.db();
2697    if let Some(contract) = db.collection_contract(collection) {
2698        let advisory_implicit_dynamic = matches!(
2699            (&contract.origin, &contract.schema_mode),
2700            (
2701                crate::physical::ContractOrigin::Implicit,
2702                crate::catalog::SchemaMode::Dynamic,
2703            )
2704        );
2705        if advisory_implicit_dynamic
2706            || matches!(
2707                contract.declared_model,
2708                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2709            )
2710        {
2711            return Ok(());
2712        }
2713        return Err(RedDBError::InvalidOperation(format!(
2714            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2715            collection, contract.declared_model
2716        )));
2717    }
2718
2719    let now = std::time::SystemTime::now()
2720        .duration_since(std::time::UNIX_EPOCH)
2721        .unwrap_or_default()
2722        .as_millis();
2723    db.save_collection_contract(crate::physical::CollectionContract {
2724        name: collection.to_string(),
2725        declared_model: crate::catalog::CollectionModel::Graph,
2726        schema_mode: crate::catalog::SchemaMode::Dynamic,
2727        origin: crate::physical::ContractOrigin::Implicit,
2728        version: 1,
2729        created_at_unix_ms: now,
2730        updated_at_unix_ms: now,
2731        default_ttl_ms: db.collection_default_ttl_ms(collection),
2732        vector_dimension: None,
2733        vector_metric: None,
2734        context_index_fields: Vec::new(),
2735        declared_columns: Vec::new(),
2736        table_def: None,
2737        timestamps_enabled: false,
2738        context_index_enabled: false,
2739        metrics_raw_retention_ms: None,
2740        metrics_rollup_policies: Vec::new(),
2741        metrics_tenant_identity: None,
2742        metrics_namespace: None,
2743        append_only: false,
2744        subscriptions: Vec::new(),
2745        session_key: None,
2746        session_gap_ms: None,
2747        retention_duration_ms: None,
2748    })
2749    .map(|_| ())
2750    .map_err(|err| RedDBError::Internal(err.to_string()))
2751}
2752
2753fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2754    query
2755        .assignment_exprs
2756        .iter()
2757        .enumerate()
2758        .any(|(idx, (column, expr))| {
2759            query
2760                .compound_assignment_ops
2761                .get(idx)
2762                .is_some_and(|op| op.is_some())
2763                || expr_references_update_column(expr, &query.table, column)
2764        })
2765}
2766
2767fn evaluate_compound_update_assignment(
2768    column: &str,
2769    record: &UnifiedRecord,
2770    op: BinOp,
2771    rhs: Value,
2772) -> RedDBResult<Value> {
2773    let lhs = record.get(column).ok_or_else(|| {
2774        RedDBError::Query(format!(
2775            "compound assignment requires existing numeric field '{column}'"
2776        ))
2777    })?;
2778    if matches!(lhs, Value::Null) {
2779        return Err(RedDBError::Query(format!(
2780            "compound assignment requires non-null numeric field '{column}'"
2781        )));
2782    }
2783    apply_compound_numeric_op(column, op, lhs, &rhs)
2784}
2785
2786fn apply_compound_numeric_op(
2787    column: &str,
2788    op: BinOp,
2789    lhs: &Value,
2790    rhs: &Value,
2791) -> RedDBResult<Value> {
2792    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2793        return Err(RedDBError::Query(format!(
2794            "compound assignment requires numeric field '{column}'"
2795        )));
2796    };
2797    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2798        return Err(RedDBError::Query(format!(
2799            "compound assignment requires numeric right-hand value for field '{column}'"
2800        )));
2801    };
2802
2803    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2804        let a = lhs_number.as_f64();
2805        let b = rhs_number.as_f64();
2806        let out = match op {
2807            BinOp::Add => a + b,
2808            BinOp::Sub => a - b,
2809            BinOp::Mul => a * b,
2810            BinOp::Div => {
2811                if b == 0.0 {
2812                    return Err(RedDBError::Query(format!(
2813                        "division by zero in compound assignment for field '{column}'"
2814                    )));
2815                }
2816                a / b
2817            }
2818            BinOp::Mod => {
2819                if b == 0.0 {
2820                    return Err(RedDBError::Query(format!(
2821                        "modulo by zero in compound assignment for field '{column}'"
2822                    )));
2823                }
2824                a % b
2825            }
2826            _ => {
2827                return Err(RedDBError::Query(format!(
2828                    "unsupported compound assignment operator for field '{column}'"
2829                )));
2830            }
2831        };
2832        if !out.is_finite() {
2833            return Err(RedDBError::Query(format!(
2834                "numeric overflow in compound assignment for field '{column}'"
2835            )));
2836        }
2837        return Ok(Value::Float(out));
2838    }
2839
2840    let a = lhs_number.as_i128();
2841    let b = rhs_number.as_i128();
2842    let out = match op {
2843        BinOp::Add => a.checked_add(b),
2844        BinOp::Sub => a.checked_sub(b),
2845        BinOp::Mul => a.checked_mul(b),
2846        BinOp::Mod => {
2847            if b == 0 {
2848                return Err(RedDBError::Query(format!(
2849                    "modulo by zero in compound assignment for field '{column}'"
2850                )));
2851            }
2852            a.checked_rem(b)
2853        }
2854        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2855        _ => None,
2856    }
2857    .ok_or_else(|| {
2858        RedDBError::Query(format!(
2859            "numeric overflow in compound assignment for field '{column}'"
2860        ))
2861    })?;
2862
2863    if matches!(lhs, Value::UnsignedInteger(_)) {
2864        let value = u64::try_from(out).map_err(|_| {
2865            RedDBError::Query(format!(
2866                "numeric overflow in compound assignment for field '{column}'"
2867            ))
2868        })?;
2869        Ok(Value::UnsignedInteger(value))
2870    } else {
2871        let value = i64::try_from(out).map_err(|_| {
2872            RedDBError::Query(format!(
2873                "numeric overflow in compound assignment for field '{column}'"
2874            ))
2875        })?;
2876        Ok(Value::Integer(value))
2877    }
2878}
2879
2880#[derive(Clone, Copy)]
2881enum CompoundNumber {
2882    Integer(i128),
2883    Float(f64),
2884}
2885
2886impl CompoundNumber {
2887    fn from_value(value: &Value) -> Option<Self> {
2888        match value {
2889            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2890            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2891            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2892            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2893            _ => None,
2894        }
2895    }
2896
2897    fn is_float(self) -> bool {
2898        matches!(self, Self::Float(_))
2899    }
2900
2901    fn as_f64(self) -> f64 {
2902        match self {
2903            Self::Integer(value) => value as f64,
2904            Self::Float(value) => value,
2905        }
2906    }
2907
2908    fn as_i128(self) -> i128 {
2909        match self {
2910            Self::Integer(value) => value,
2911            Self::Float(_) => unreachable!("float compound number used as integer"),
2912        }
2913    }
2914}
2915
2916fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2917    match expr {
2918        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2919        Expr::Column { field, .. } => {
2920            field_ref_matches_update_column(field, table_name, target_column)
2921        }
2922        Expr::BinaryOp { lhs, rhs, .. } => {
2923            expr_references_update_column(lhs, table_name, target_column)
2924                || expr_references_update_column(rhs, table_name, target_column)
2925        }
2926        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2927            expr_references_update_column(operand, table_name, target_column)
2928        }
2929        Expr::FunctionCall { args, .. } => args
2930            .iter()
2931            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2932        Expr::Case {
2933            branches, else_, ..
2934        } => {
2935            branches.iter().any(|(cond, value)| {
2936                expr_references_update_column(cond, table_name, target_column)
2937                    || expr_references_update_column(value, table_name, target_column)
2938            }) || else_
2939                .as_deref()
2940                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2941        }
2942        Expr::IsNull { operand, .. } => {
2943            expr_references_update_column(operand, table_name, target_column)
2944        }
2945        Expr::InList { target, values, .. } => {
2946            expr_references_update_column(target, table_name, target_column)
2947                || values
2948                    .iter()
2949                    .any(|value| expr_references_update_column(value, table_name, target_column))
2950        }
2951        Expr::Between {
2952            target, low, high, ..
2953        } => {
2954            expr_references_update_column(target, table_name, target_column)
2955                || expr_references_update_column(low, table_name, target_column)
2956                || expr_references_update_column(high, table_name, target_column)
2957        }
2958        Expr::WindowFunctionCall { args, window, .. } => {
2959            args.iter()
2960                .any(|arg| expr_references_update_column(arg, table_name, target_column))
2961                || window
2962                    .partition_by
2963                    .iter()
2964                    .any(|e| expr_references_update_column(e, table_name, target_column))
2965                || window
2966                    .order_by
2967                    .iter()
2968                    .any(|o| expr_references_update_column(&o.expr, table_name, target_column))
2969        }
2970    }
2971}
2972
2973fn field_ref_matches_update_column(
2974    field: &FieldRef,
2975    table_name: &str,
2976    target_column: &str,
2977) -> bool {
2978    match field {
2979        FieldRef::TableColumn { table, column } => {
2980            column.eq_ignore_ascii_case(target_column)
2981                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2982        }
2983        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2984            false
2985        }
2986    }
2987}
2988
2989fn resolve_update_entity_by_logical_id(
2990    runtime: &RedDBRuntime,
2991    table: &str,
2992    logical_id: EntityId,
2993) -> Option<UnifiedEntity> {
2994    let store = runtime.inner.db.store();
2995    if let Some(entity) =
2996        crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2997            .resolve_logical_id(&store, table, logical_id)
2998    {
2999        return Some(entity);
3000    }
3001    // Fallback for non-table-row entities (graph nodes/edges, etc.) where
3002    // entity_id == logical_id and the MVCC table-row resolver doesn't apply.
3003    store.get(table, logical_id)
3004}
3005
3006fn update_cdc_item_kind(
3007    runtime: &RedDBRuntime,
3008    collection: &str,
3009    entity: &UnifiedEntity,
3010) -> &'static str {
3011    match &entity.data {
3012        EntityData::Node(_) => return "node",
3013        EntityData::Edge(_) => return "edge",
3014        _ => {}
3015    }
3016
3017    match runtime
3018        .db()
3019        .collection_contract(collection)
3020        .map(|contract| contract.declared_model)
3021    {
3022        Some(crate::catalog::CollectionModel::Document) => "document",
3023        Some(crate::catalog::CollectionModel::Kv)
3024        | Some(crate::catalog::CollectionModel::Vault) => "kv",
3025        _ => "row",
3026    }
3027}
3028
3029fn ordered_update_target_ids(
3030    manager: &Arc<crate::storage::SegmentManager>,
3031    entity_ids: &[EntityId],
3032    order_by: &[OrderByClause],
3033    limit: Option<usize>,
3034) -> Vec<EntityId> {
3035    let mut entities: Vec<UnifiedEntity> =
3036        manager.get_many(entity_ids).into_iter().flatten().collect();
3037    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
3038    if let Some(limit) = limit {
3039        entities.truncate(limit);
3040    }
3041    entities.into_iter().map(|entity| entity.id).collect()
3042}
3043
3044fn compare_update_order(
3045    left: &UnifiedEntity,
3046    right: &UnifiedEntity,
3047    order_by: &[OrderByClause],
3048) -> Ordering {
3049    for clause in order_by {
3050        let left_value = update_order_value(left, &clause.field);
3051        let right_value = update_order_value(right, &clause.field);
3052        let ordering = compare_update_order_values(
3053            left_value.as_ref(),
3054            right_value.as_ref(),
3055            clause.nulls_first,
3056        );
3057        if ordering != Ordering::Equal {
3058            return if clause.ascending {
3059                ordering
3060            } else {
3061                ordering.reverse()
3062            };
3063        }
3064    }
3065    left.logical_id().raw().cmp(&right.logical_id().raw())
3066}
3067
3068fn compare_update_order_values(
3069    left: Option<&Value>,
3070    right: Option<&Value>,
3071    nulls_first: bool,
3072) -> Ordering {
3073    match (left, right) {
3074        (None, None) => Ordering::Equal,
3075        (None, Some(_)) => {
3076            if nulls_first {
3077                Ordering::Less
3078            } else {
3079                Ordering::Greater
3080            }
3081        }
3082        (Some(_), None) => {
3083            if nulls_first {
3084                Ordering::Greater
3085            } else {
3086                Ordering::Less
3087            }
3088        }
3089        (Some(left), Some(right)) => {
3090            crate::storage::query::value_compare::total_compare_values(left, right)
3091        }
3092    }
3093}
3094
3095fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
3096    let FieldRef::TableColumn { table, column } = field else {
3097        return None;
3098    };
3099    if !table.is_empty() {
3100        return None;
3101    }
3102    if column.eq_ignore_ascii_case("rid") {
3103        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
3104    }
3105    match &entity.data {
3106        EntityData::Row(row) => row.get_field(column).cloned(),
3107        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
3108            .and_then(|record| record.get(column).cloned()),
3109        _ => None,
3110    }
3111}
3112
3113fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
3114    if columns.is_empty() {
3115        return columns;
3116    }
3117
3118    let mut unique = Vec::with_capacity(columns.len());
3119    for column in columns.drain(..) {
3120        if !unique
3121            .iter()
3122            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
3123        {
3124            unique.push(column);
3125        }
3126    }
3127    unique
3128}
3129
3130// =============================================================================
3131// Helper functions for extracting typed values from column/value pairs
3132// =============================================================================
3133
3134const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
3135
3136fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
3137    if column.eq_ignore_ascii_case("_ttl") {
3138        Some(SQL_TTL_METADATA_COLUMNS[0])
3139    } else if column.eq_ignore_ascii_case("_ttl_ms") {
3140        Some(SQL_TTL_METADATA_COLUMNS[1])
3141    } else if column.eq_ignore_ascii_case("_expires_at") {
3142        Some(SQL_TTL_METADATA_COLUMNS[2])
3143    } else {
3144        None
3145    }
3146}
3147
3148/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
3149/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
3150/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
3151/// `_ttl_ms` and `_expires_at` are passed through.
3152fn canonicalize_sql_ttl_metadata(
3153    key: &'static str,
3154    value: MetadataValue,
3155) -> (&'static str, MetadataValue) {
3156    if key != "_ttl" {
3157        return (key, value);
3158    }
3159    let scaled = match value {
3160        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
3161        MetadataValue::Timestamp(ms_or_s) => {
3162            // Timestamp is already chosen for very large values; treat as
3163            // already-ms to avoid silent overflow.
3164            MetadataValue::Timestamp(ms_or_s)
3165        }
3166        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
3167        other => other,
3168    };
3169    ("_ttl_ms", scaled)
3170}
3171
3172/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
3173/// `SECRET('...')` literals. The runtime strips this marker and
3174/// applies the actual crypto transform during INSERT execution.
3175pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
3176
3177impl RedDBRuntime {
3178    /// Strip the plaintext sentinel from a `Value::Password` or
3179    /// `Value::Secret` produced by the parser and apply the real
3180    /// crypto transform. `Password` is always hashed with argon2id.
3181    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
3182    /// when `red.config.secret.auto_encrypt = true` (default).
3183    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
3184        match value {
3185            Value::Password(marked) => {
3186                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
3187                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
3188                } else {
3189                    Ok(Value::Password(marked))
3190                }
3191            }
3192            Value::Secret(bytes) => {
3193                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
3194                    if !self.secret_auto_encrypt() {
3195                        return Err(RedDBError::Query(
3196                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
3197                             is false. Insert pre-encrypted bytes directly instead."
3198                                .to_string(),
3199                        ));
3200                    }
3201                    let key = self.secret_aes_key().ok_or_else(|| {
3202                        RedDBError::Query(
3203                            "SECRET() column encryption requires a bootstrapped \
3204                             vault (red.secret.aes_key is missing). Start the server \
3205                             with --vault to enable."
3206                                .to_string(),
3207                        )
3208                    })?;
3209                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
3210                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
3211                } else {
3212                    Ok(Value::Secret(bytes))
3213                }
3214            }
3215            other => Ok(other),
3216        }
3217    }
3218}
3219
3220/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
3221/// This is the on-disk representation of `Value::Secret`.
3222fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
3223    let nonce_bytes = crate::auth::store::random_bytes(12);
3224    let mut nonce = [0u8; 12];
3225    nonce.copy_from_slice(&nonce_bytes[..12]);
3226    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
3227    let mut out = Vec::with_capacity(12 + ct.len());
3228    out.extend_from_slice(&nonce);
3229    out.extend_from_slice(&ct);
3230    out
3231}
3232
3233/// Decode a `Value::Secret` payload back to plaintext. Returns
3234/// `None` when the payload is too short or AES-GCM authentication
3235/// fails (tampered or wrong key).
3236pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
3237    if payload.len() < 12 {
3238        return None;
3239    }
3240    let mut nonce = [0u8; 12];
3241    nonce.copy_from_slice(&payload[..12]);
3242    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
3243}
3244
3245fn split_insert_metadata(
3246    runtime: &RedDBRuntime,
3247    columns: &[String],
3248    values: &[Value],
3249) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
3250    let mut fields = Vec::new();
3251    let mut metadata = Vec::new();
3252
3253    for (column, value) in columns.iter().zip(values.iter()) {
3254        // Still support legacy _ttl columns for backward compat
3255        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
3256            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
3257            let (canonical_key, canonical_value) =
3258                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
3259            metadata.push((canonical_key.to_string(), canonical_value));
3260            continue;
3261        }
3262        fields.push((
3263            column.clone(),
3264            runtime.resolve_crypto_sentinel(value.clone())?,
3265        ));
3266    }
3267
3268    Ok((fields, metadata))
3269}
3270
3271/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
3272fn merge_with_clauses(
3273    metadata: &mut Vec<(String, MetadataValue)>,
3274    ttl_ms: Option<u64>,
3275    expires_at_ms: Option<u64>,
3276    with_metadata: &[(String, Value)],
3277) {
3278    if let Some(ms) = ttl_ms {
3279        metadata.push((
3280            "_ttl_ms".to_string(),
3281            if ms <= i64::MAX as u64 {
3282                MetadataValue::Int(ms as i64)
3283            } else {
3284                MetadataValue::Timestamp(ms)
3285            },
3286        ));
3287    }
3288    if let Some(ms) = expires_at_ms {
3289        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
3290    }
3291    for (key, value) in with_metadata {
3292        let meta_value = match value {
3293            Value::Text(s) => MetadataValue::String(s.to_string()),
3294            Value::Integer(n) => MetadataValue::Int(*n),
3295            Value::Float(n) => MetadataValue::Float(*n),
3296            Value::Boolean(b) => MetadataValue::Bool(*b),
3297            _ => MetadataValue::String(value.to_string()),
3298        };
3299        metadata.push((key.clone(), meta_value));
3300    }
3301}
3302
3303fn merge_vector_metadata_column(
3304    metadata: &mut Vec<(String, MetadataValue)>,
3305    columns: &[String],
3306    values: &[Value],
3307) -> RedDBResult<()> {
3308    let Some(value) = columns
3309        .iter()
3310        .position(|column| column.eq_ignore_ascii_case("metadata"))
3311        .map(|index| &values[index])
3312    else {
3313        return Ok(());
3314    };
3315    let json = match value {
3316        Value::Null => return Ok(()),
3317        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
3318            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3319        })?,
3320        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
3321            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
3322        })?,
3323        other => {
3324            return Err(RedDBError::Query(format!(
3325                "column 'metadata' expected JSON object, got {other:?}"
3326            )))
3327        }
3328    };
3329    let parsed = metadata_from_json(&json)?;
3330    for (key, value) in parsed.iter() {
3331        metadata.push((key.clone(), value.clone()));
3332    }
3333    Ok(())
3334}
3335
3336fn apply_collection_default_ttl_metadata(
3337    runtime: &RedDBRuntime,
3338    collection: &str,
3339    metadata: &mut Vec<(String, MetadataValue)>,
3340) {
3341    if has_internal_ttl_metadata(metadata) {
3342        return;
3343    }
3344
3345    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
3346        return;
3347    };
3348
3349    metadata.push((
3350        "_ttl_ms".to_string(),
3351        if default_ttl_ms <= i64::MAX as u64 {
3352            MetadataValue::Int(default_ttl_ms as i64)
3353        } else {
3354            MetadataValue::Timestamp(default_ttl_ms)
3355        },
3356    ));
3357}
3358
3359fn ensure_non_tree_reserved_metadata_entries(
3360    metadata: &[(String, MetadataValue)],
3361) -> RedDBResult<()> {
3362    for (key, _) in metadata {
3363        ensure_non_tree_reserved_metadata_key(key)?;
3364    }
3365    Ok(())
3366}
3367
3368fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
3369    if key.starts_with(TREE_METADATA_PREFIX) {
3370        return Err(RedDBError::Query(format!(
3371            "metadata key '{}' is reserved for managed trees",
3372            key
3373        )));
3374    }
3375    Ok(())
3376}
3377
3378fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
3379    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
3380        return Err(RedDBError::Query(format!(
3381            "edge label '{}' is reserved for managed trees",
3382            TREE_CHILD_EDGE_LABEL
3383        )));
3384    }
3385    Ok(())
3386}
3387
3388fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
3389    let mut columns = Vec::with_capacity(pairs.len());
3390    let mut values = Vec::with_capacity(pairs.len());
3391
3392    for (column, value) in pairs {
3393        columns.push(column.clone());
3394        values.push(value.clone());
3395    }
3396
3397    (columns, values)
3398}
3399
3400/// Find a required column value and return it as-is.
3401fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
3402    for (i, col) in columns.iter().enumerate() {
3403        if col.eq_ignore_ascii_case(name) {
3404            return Ok(values[i].clone());
3405        }
3406    }
3407    Err(RedDBError::Query(format!(
3408        "required column '{name}' not found in INSERT"
3409    )))
3410}
3411
3412/// Find a required column value and coerce to String.
3413fn find_column_value_string(
3414    columns: &[String],
3415    values: &[Value],
3416    name: &str,
3417) -> RedDBResult<String> {
3418    let val = find_column_value(columns, values, name)?;
3419    match val {
3420        Value::Text(s) => Ok(s.to_string()),
3421        Value::Integer(n) => Ok(n.to_string()),
3422        Value::Float(n) => Ok(n.to_string()),
3423        other => Err(RedDBError::Query(format!(
3424            "column '{name}' expected text, got {other:?}"
3425        ))),
3426    }
3427}
3428
3429fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
3430    let val = find_column_value(columns, values, name)?;
3431    match val {
3432        Value::Float(n) => Ok(n),
3433        Value::Integer(n) => Ok(n as f64),
3434        Value::UnsignedInteger(n) => Ok(n as f64),
3435        Value::Text(s) => s
3436            .parse::<f64>()
3437            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
3438        other => Err(RedDBError::Query(format!(
3439            "column '{name}' expected number, got {other:?}"
3440        ))),
3441    }
3442}
3443
3444/// Find an optional column value as String.
3445fn find_column_value_opt_string(
3446    columns: &[String],
3447    values: &[Value],
3448    name: &str,
3449) -> Option<String> {
3450    for (i, col) in columns.iter().enumerate() {
3451        if col.eq_ignore_ascii_case(name) {
3452            return match &values[i] {
3453                Value::Null => None,
3454                Value::Text(s) => Some(s.to_string()),
3455                Value::Integer(n) => Some(n.to_string()),
3456                Value::Float(n) => Some(n.to_string()),
3457                _ => None,
3458            };
3459        }
3460    }
3461    None
3462}
3463
3464/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
3465///
3466/// Accepts integer literals, decimal strings, and node labels resolved via
3467/// the per-collection graph label index (same source of truth that
3468/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
3469/// labels error so callers can fall back to the numeric id form.
3470fn resolve_edge_endpoint(
3471    store: &crate::storage::unified::UnifiedStore,
3472    collection: &str,
3473    columns: &[String],
3474    values: &[Value],
3475    name: &str,
3476) -> RedDBResult<u64> {
3477    let val = find_column_value(columns, values, name)?;
3478    match val {
3479        Value::Integer(n) => Ok(n as u64),
3480        Value::UnsignedInteger(n) => Ok(n),
3481        Value::Text(s) => {
3482            if let Ok(n) = s.parse::<u64>() {
3483                return Ok(n);
3484            }
3485            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
3486            match matches.len() {
3487                0 => Err(RedDBError::Query(format!(
3488                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
3489                ))),
3490                1 => Ok(matches[0].raw()),
3491                n => Err(RedDBError::Query(format!(
3492                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
3493                ))),
3494            }
3495        }
3496        other => Err(RedDBError::Query(format!(
3497            "column '{name}' expected integer or node label, got {other:?}"
3498        ))),
3499    }
3500}
3501
3502fn resolve_edge_endpoint_any(
3503    store: &crate::storage::unified::UnifiedStore,
3504    collection: &str,
3505    columns: &[String],
3506    values: &[Value],
3507    names: &[&str],
3508) -> RedDBResult<u64> {
3509    for name in names {
3510        if columns
3511            .iter()
3512            .any(|column| column.eq_ignore_ascii_case(name))
3513        {
3514            return resolve_edge_endpoint(store, collection, columns, values, name);
3515        }
3516    }
3517
3518    Err(RedDBError::Query(format!(
3519        "required column '{}' not found in INSERT",
3520        names.first().copied().unwrap_or("from_rid")
3521    )))
3522}
3523
3524/// Find a required column value and coerce to u64.
3525fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3526    let val = find_column_value(columns, values, name)?;
3527    match val {
3528        Value::Integer(n) => Ok(n as u64),
3529        Value::UnsignedInteger(n) => Ok(n),
3530        Value::Text(s) => s
3531            .parse::<u64>()
3532            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3533        other => Err(RedDBError::Query(format!(
3534            "column '{name}' expected integer, got {other:?}"
3535        ))),
3536    }
3537}
3538
3539/// Find an optional column value as f32.
3540fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3541    for (i, col) in columns.iter().enumerate() {
3542        if col.eq_ignore_ascii_case(name) {
3543            return match &values[i] {
3544                Value::Float(n) => Some(*n as f32),
3545                Value::Integer(n) => Some(*n as f32),
3546                Value::Null => None,
3547                _ => None,
3548            };
3549        }
3550    }
3551    None
3552}
3553
3554/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3555fn find_column_value_vec_f32(
3556    columns: &[String],
3557    values: &[Value],
3558    name: &str,
3559) -> RedDBResult<Vec<f32>> {
3560    let val = find_column_value(columns, values, name)?;
3561    match val {
3562        Value::Vector(v) => Ok(v),
3563        Value::Json(bytes) => {
3564            // Try to parse as JSON array of numbers
3565            let s = std::str::from_utf8(&bytes).map_err(|_| {
3566                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3567            })?;
3568            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3569                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3570            })?;
3571            Ok(arr)
3572        }
3573        other => Err(RedDBError::Query(format!(
3574            "column '{name}' expected vector, got {other:?}"
3575        ))),
3576    }
3577}
3578
3579fn find_column_value_vec_f32_any(
3580    columns: &[String],
3581    values: &[Value],
3582    names: &[&str],
3583) -> RedDBResult<Vec<f32>> {
3584    for name in names {
3585        if columns
3586            .iter()
3587            .any(|column| column.eq_ignore_ascii_case(name))
3588        {
3589            return find_column_value_vec_f32(columns, values, name);
3590        }
3591    }
3592    Err(RedDBError::Query(format!(
3593        "required vector column '{}' not found in INSERT",
3594        names.join("' or '")
3595    )))
3596}
3597
3598/// Extract remaining properties (all columns not in the exclusion list).
3599fn extract_remaining_properties(
3600    columns: &[String],
3601    values: &[Value],
3602    exclude: &[&str],
3603) -> Vec<(String, Value)> {
3604    columns
3605        .iter()
3606        .zip(values.iter())
3607        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3608        .map(|(col, val)| (col.clone(), val.clone()))
3609        .collect()
3610}
3611
3612fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3613    let mut invalid = Vec::new();
3614    for column in columns {
3615        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3616            invalid.push(column.clone());
3617        }
3618    }
3619
3620    if invalid.is_empty() {
3621        Ok(())
3622    } else {
3623        Err(RedDBError::Query(format!(
3624            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3625            invalid.join(", ")
3626        )))
3627    }
3628}
3629
3630fn is_timeseries_insert_column(column: &str) -> bool {
3631    matches!(
3632        column.to_ascii_lowercase().as_str(),
3633        "metric"
3634            | "value"
3635            | "tags"
3636            | "timestamp"
3637            | "timestamp_ns"
3638            | "time"
3639            // Analytics-event extension (#577): an analytics row carries
3640            // an `event_name` + JSON `payload`. The payload is validated
3641            // against the AnalyticsSchemaRegistry inside
3642            // `insert_timeseries_point` before the row lands.
3643            | "event_name"
3644            | "payload"
3645    )
3646}
3647
3648fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3649    let mut found = None;
3650
3651    for alias in ["timestamp_ns", "timestamp", "time"] {
3652        for (index, column) in columns.iter().enumerate() {
3653            if !column.eq_ignore_ascii_case(alias) {
3654                continue;
3655            }
3656
3657            if found.is_some() {
3658                return Err(RedDBError::Query(
3659                    "timeseries INSERT accepts only one timestamp column".to_string(),
3660                ));
3661            }
3662
3663            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3664        }
3665    }
3666
3667    Ok(found)
3668}
3669
3670fn find_timeseries_tags(
3671    columns: &[String],
3672    values: &[Value],
3673) -> RedDBResult<std::collections::HashMap<String, String>> {
3674    for (index, column) in columns.iter().enumerate() {
3675        if column.eq_ignore_ascii_case("tags") {
3676            return parse_timeseries_tags(&values[index]);
3677        }
3678    }
3679    Ok(std::collections::HashMap::new())
3680}
3681
3682fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3683    match value {
3684        Value::Null => Ok(std::collections::HashMap::new()),
3685        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3686        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3687        other => Err(RedDBError::Query(format!(
3688            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3689        ))),
3690    }
3691}
3692
3693fn parse_timeseries_tags_json(
3694    bytes: &[u8],
3695) -> RedDBResult<std::collections::HashMap<String, String>> {
3696    let json: crate::json::Value = crate::json::from_slice(bytes)
3697        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3698
3699    let object = match json {
3700        crate::json::Value::Object(object) => object,
3701        other => {
3702            return Err(RedDBError::Query(format!(
3703                "timeseries tags must be a JSON object, got {other:?}"
3704            )))
3705        }
3706    };
3707
3708    let mut tags = std::collections::HashMap::with_capacity(object.len());
3709    for (key, value) in object {
3710        tags.insert(key, json_tag_value_to_string(&value));
3711    }
3712    Ok(tags)
3713}
3714
3715/// Encode a tag value for storage so the original JSON type can be
3716/// recovered on read (issue #543).
3717///
3718/// Time-series tags are stored as `HashMap<String, String>` on the
3719/// physical record (see [`crate::storage::TimeSeriesData`]) so that
3720/// the segment codec, WAL and gRPC mirrors don't need a new value
3721/// variant. To preserve the original JSON type across that
3722/// string-only channel we prepend the
3723/// [`crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX`] marker
3724/// and serialize the value as compact JSON text. The read paths
3725/// (`timeseries_tags_json_value` / `timeseries_tags_value`) detect
3726/// the marker, parse the suffix, and recover a real JSON value.
3727/// Tags written through other channels (Prometheus remote write,
3728/// metrics handlers, legacy on-disk data) lack the marker and are
3729/// returned as `JsonValue::String(raw)` exactly as before.
3730fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3731    let mut buf = String::with_capacity(value.to_string_compact().len() + 1);
3732    buf.push(crate::runtime::query_exec::TIMESERIES_TAG_JSON_PREFIX);
3733    buf.push_str(&value.to_string_compact());
3734    buf
3735}
3736
3737fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3738    match value {
3739        Value::UnsignedInteger(value) => Ok(*value),
3740        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3741        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3742        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3743            RedDBError::Query(format!(
3744                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3745            ))
3746        }),
3747        other => Err(RedDBError::Query(format!(
3748            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3749        ))),
3750    }
3751}
3752
3753fn current_unix_ns() -> u64 {
3754    std::time::SystemTime::now()
3755        .duration_since(std::time::UNIX_EPOCH)
3756        .unwrap_or_default()
3757        .as_nanos()
3758        .min(u128::from(u64::MAX)) as u64
3759}
3760
3761fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3762    use crate::json::{Map, Value as JV};
3763    match value {
3764        MetadataValue::Null => JV::Null,
3765        MetadataValue::Bool(value) => JV::Bool(*value),
3766        MetadataValue::Int(value) => JV::Number(*value as f64),
3767        MetadataValue::Float(value) => JV::Number(*value),
3768        MetadataValue::String(value) => JV::String(value.clone()),
3769        MetadataValue::Bytes(value) => JV::Array(
3770            value
3771                .iter()
3772                .map(|value| JV::Number(*value as f64))
3773                .collect(),
3774        ),
3775        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3776        MetadataValue::Array(values) => {
3777            JV::Array(values.iter().map(metadata_value_to_json).collect())
3778        }
3779        MetadataValue::Object(object) => {
3780            let entries = object
3781                .iter()
3782                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3783                .collect();
3784            JV::Object(entries)
3785        }
3786        MetadataValue::Geo { lat, lon } => {
3787            let mut object = Map::new();
3788            object.insert("lat".to_string(), JV::Number(*lat));
3789            object.insert("lon".to_string(), JV::Number(*lon));
3790            JV::Object(object)
3791        }
3792        MetadataValue::Reference(target) => {
3793            let mut object = Map::new();
3794            object.insert(
3795                "collection".to_string(),
3796                JV::String(target.collection().to_string()),
3797            );
3798            object.insert(
3799                "entity_id".to_string(),
3800                JV::Number(target.entity_id().raw() as f64),
3801            );
3802            JV::Object(object)
3803        }
3804        MetadataValue::References(values) => {
3805            let refs = values
3806                .iter()
3807                .map(|target| {
3808                    let mut object = Map::new();
3809                    object.insert(
3810                        "collection".to_string(),
3811                        JV::String(target.collection().to_string()),
3812                    );
3813                    object.insert(
3814                        "entity_id".to_string(),
3815                        JV::Number(target.entity_id().raw() as f64),
3816                    );
3817                    JV::Object(object)
3818                })
3819                .collect();
3820            JV::Array(refs)
3821        }
3822    }
3823}
3824
3825fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3826    match value {
3827        Value::Null => MetadataValue::Null,
3828        Value::Boolean(value) => MetadataValue::Bool(*value),
3829        Value::Integer(value) => MetadataValue::Int(*value),
3830        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3831        Value::Float(value) => MetadataValue::Float(*value),
3832        Value::Text(value) => MetadataValue::String(value.to_string()),
3833        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3834        Value::Timestamp(value) => {
3835            if *value >= 0 {
3836                metadata_u64_to_value(*value as u64)
3837            } else {
3838                MetadataValue::Int(*value)
3839            }
3840        }
3841        Value::TimestampMs(value) => {
3842            if *value >= 0 {
3843                metadata_u64_to_value(*value as u64)
3844            } else {
3845                MetadataValue::Int(*value)
3846            }
3847        }
3848        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3849        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3850        Value::Date(value) => MetadataValue::String(value.to_string()),
3851        Value::Time(value) => MetadataValue::String(value.to_string()),
3852        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3853        Value::Ipv4(value) => MetadataValue::String(format!(
3854            "{}.{}.{}.{}",
3855            (value >> 24) & 0xFF,
3856            (value >> 16) & 0xFF,
3857            (value >> 8) & 0xFF,
3858            value & 0xFF
3859        )),
3860        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3861        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3862        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3863        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3864            lat: *lat as f64 / 1_000_000.0,
3865            lon: *lon as f64 / 1_000_000.0,
3866        },
3867        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3868        Value::TableRef(value) => MetadataValue::String(value.clone()),
3869        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3870        Value::Password(value) => MetadataValue::String(value.clone()),
3871        Value::Array(values) => {
3872            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3873        }
3874        _ => MetadataValue::String(value.to_string()),
3875    }
3876}
3877
3878fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3879    match value {
3880        Value::Null => Ok(MetadataValue::Null),
3881        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3882        Value::Integer(_) => Err(RedDBError::Query(format!(
3883            "column '{field}' must be non-negative for TTL metadata"
3884        ))),
3885        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3886        Value::Float(value) if value.is_finite() => {
3887            if value.fract().abs() >= f64::EPSILON {
3888                return Err(RedDBError::Query(format!(
3889                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3890                )));
3891            }
3892            if *value < 0.0 {
3893                return Err(RedDBError::Query(format!(
3894                    "column '{field}' must be non-negative for TTL metadata"
3895                )));
3896            }
3897            if *value > u64::MAX as f64 {
3898                return Err(RedDBError::Query(format!(
3899                    "column '{field}' value is too large"
3900                )));
3901            }
3902            Ok(metadata_u64_to_value(*value as u64))
3903        }
3904        Value::Float(_) => Err(RedDBError::Query(format!(
3905            "column '{field}' must be a finite number"
3906        ))),
3907        Value::Text(value) => {
3908            let value = value.trim();
3909            if let Ok(value) = value.parse::<u64>() {
3910                Ok(metadata_u64_to_value(value))
3911            } else if let Ok(value) = value.parse::<i64>() {
3912                if value < 0 {
3913                    return Err(RedDBError::Query(format!(
3914                        "column '{field}' must be non-negative for TTL metadata"
3915                    )));
3916                }
3917                Ok(metadata_u64_to_value(value as u64))
3918            } else if let Ok(value) = value.parse::<f64>() {
3919                if !value.is_finite() {
3920                    return Err(RedDBError::Query(format!(
3921                        "column '{field}' must be a finite number"
3922                    )));
3923                }
3924                if value.fract().abs() >= f64::EPSILON {
3925                    return Err(RedDBError::Query(format!(
3926                        "column '{field}' must be an integer (TTL metadata must be an integer)"
3927                    )));
3928                }
3929                if value < 0.0 {
3930                    return Err(RedDBError::Query(format!(
3931                        "column '{field}' must be non-negative for TTL metadata"
3932                    )));
3933                }
3934                if value > u64::MAX as f64 {
3935                    return Err(RedDBError::Query(format!(
3936                        "column '{field}' value is too large"
3937                    )));
3938                }
3939                Ok(metadata_u64_to_value(value as u64))
3940            } else {
3941                Err(RedDBError::Query(format!(
3942                    "column '{field}' expects a numeric value for TTL metadata"
3943                )))
3944            }
3945        }
3946        _ => Err(RedDBError::Query(format!(
3947            "column '{field}' expects a numeric value for TTL metadata"
3948        ))),
3949    }
3950}
3951
3952fn metadata_u64_to_value(value: u64) -> MetadataValue {
3953    if value <= i64::MAX as u64 {
3954        MetadataValue::Int(value as i64)
3955    } else {
3956        MetadataValue::Timestamp(value)
3957    }
3958}
3959
3960/// Phase 2 PG parity: inspect a column value and return `true` when
3961/// the dotted `tail` path is already present under it. Used by the
3962/// tenant auto-fill so rows that already carry an explicit value
3963/// (bulk import, admin insert on behalf of a tenant) are not
3964/// double-stamped with the session's current_tenant().
3965fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3966    let json = match value {
3967        Value::Null => return false,
3968        Value::Json(bytes) | Value::Blob(bytes) => {
3969            match crate::json::from_slice::<crate::json::Value>(bytes) {
3970                Ok(v) => v,
3971                Err(_) => return false,
3972            }
3973        }
3974        Value::Text(s) => {
3975            let trimmed = s.trim_start();
3976            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3977                return false;
3978            }
3979            match crate::json::from_str::<crate::json::Value>(s) {
3980                Ok(v) => v,
3981                Err(_) => return false,
3982            }
3983        }
3984        _ => return false,
3985    };
3986    let mut cursor = &json;
3987    for seg in tail.split('.') {
3988        match cursor {
3989            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3990                Some((_, v)) => cursor = v,
3991                None => return false,
3992            },
3993            _ => return false,
3994        }
3995    }
3996    !matches!(cursor, crate::json::Value::Null)
3997}
3998
3999/// Phase 2 PG parity: take a column value (possibly Null / Text /
4000/// Json) and return a `Value::Json` with the dotted `tail` path set
4001/// to `tenant_id`. Preserves every pre-existing key.
4002///
4003/// Accepts:
4004/// * `Value::Null`  → fresh `{tail: tenant_id}` object
4005/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
4006/// * `Value::text(s)` if `s` is valid JSON → same as Json
4007/// * anything else → error (user supplied a scalar where we need
4008///   a JSON container)
4009fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
4010    let mut root = match current {
4011        Value::Null => crate::json::Value::Object(Default::default()),
4012        Value::Json(bytes) | Value::Blob(bytes) => {
4013            crate::json::from_slice(&bytes).map_err(|err| {
4014                RedDBError::Query(format!(
4015                    "tenant auto-fill: root column is not valid JSON ({err})"
4016                ))
4017            })?
4018        }
4019        Value::Text(s) => {
4020            if s.trim().is_empty() {
4021                crate::json::Value::Object(Default::default())
4022            } else {
4023                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
4024                    RedDBError::Query(format!(
4025                        "tenant auto-fill: text root is not valid JSON ({err})"
4026                    ))
4027                })?
4028            }
4029        }
4030        other => {
4031            return Err(RedDBError::Query(format!(
4032                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
4033            )));
4034        }
4035    };
4036
4037    // Navigate path segments, creating intermediate objects on demand.
4038    let segments: Vec<&str> = tail.split('.').collect();
4039    let mut cursor: &mut crate::json::Value = &mut root;
4040    for (i, seg) in segments.iter().enumerate() {
4041        let is_last = i + 1 == segments.len();
4042        let map = match cursor {
4043            crate::json::Value::Object(m) => m,
4044            _ => {
4045                return Err(RedDBError::Query(format!(
4046                    "tenant auto-fill: segment '{seg}' is not inside an object"
4047                )));
4048            }
4049        };
4050        if is_last {
4051            map.insert(
4052                seg.to_string(),
4053                crate::json::Value::String(tenant_id.to_string()),
4054            );
4055            break;
4056        }
4057        cursor = map
4058            .entry(seg.to_string())
4059            .or_insert_with(|| crate::json::Value::Object(Default::default()));
4060    }
4061
4062    let bytes = crate::json::to_vec(&root).map_err(|err| {
4063        RedDBError::Query(format!(
4064            "tenant auto-fill: failed to re-serialize JSON ({err})"
4065        ))
4066    })?;
4067    Ok(Value::Json(bytes))
4068}
4069
4070#[cfg(test)]
4071mod tests {
4072    use crate::storage::schema::Value;
4073    use crate::storage::wal::{WalReader, WalRecord};
4074    use crate::{RedDBOptions, RedDBRuntime};
4075    use std::path::Path;
4076
4077    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
4078        WalReader::open(wal_path)
4079            .expect("wal opens")
4080            .iter()
4081            .map(|record| record.expect("wal record decodes").1)
4082            .filter_map(|record| match record {
4083                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
4084                _ => None,
4085            })
4086            .collect()
4087    }
4088
4089    fn action_contains_text(action: &[u8], needle: &str) -> bool {
4090        action
4091            .windows(needle.len())
4092            .any(|window| window == needle.as_bytes())
4093    }
4094
4095    fn assert_statement_writes_collections_in_one_new_wal_batch(
4096        rt: &RedDBRuntime,
4097        wal_path: &Path,
4098        statement: &str,
4099        source: &str,
4100        event_queue: &str,
4101    ) {
4102        let before_batches = store_commit_batches(wal_path).len();
4103
4104        rt.execute_query(statement).unwrap();
4105
4106        let batches = store_commit_batches(wal_path);
4107        let statement_batches = &batches[before_batches..];
4108        let source_batch = statement_batches
4109            .iter()
4110            .position(|actions| {
4111                actions.iter().any(|action| {
4112                    action_contains_text(action, source)
4113                        && !action_contains_text(action, event_queue)
4114                })
4115            })
4116            .expect("source collection write batch is present");
4117        let event_batch = statement_batches
4118            .iter()
4119            .position(|actions| {
4120                actions
4121                    .iter()
4122                    .any(|action| action_contains_text(action, event_queue))
4123            })
4124            .expect("event queue write batch is present");
4125
4126        assert_eq!(
4127            source_batch, event_batch,
4128            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
4129        );
4130    }
4131
4132    #[test]
4133    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
4134        let dir = tempfile::tempdir().unwrap();
4135        let db_path = dir.path().join("events_dual_write.rdb");
4136        let wal_path = db_path.with_extension("rdb-uwal");
4137        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4138
4139        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4140            .unwrap();
4141        assert_statement_writes_collections_in_one_new_wal_batch(
4142            &rt,
4143            &wal_path,
4144            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
4145            "users",
4146            "users_events",
4147        );
4148    }
4149
4150    #[test]
4151    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
4152        let dir = tempfile::tempdir().unwrap();
4153        let db_path = dir.path().join("events_update_atomic.rdb");
4154        let wal_path = db_path.with_extension("rdb-uwal");
4155        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4156
4157        rt.execute_query(
4158            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
4159        )
4160        .unwrap();
4161        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4162            .unwrap();
4163
4164        assert_statement_writes_collections_in_one_new_wal_batch(
4165            &rt,
4166            &wal_path,
4167            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
4168            "users",
4169            "user_updates",
4170        );
4171    }
4172
4173    #[test]
4174    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
4175        let dir = tempfile::tempdir().unwrap();
4176        let db_path = dir.path().join("events_delete_atomic.rdb");
4177        let wal_path = db_path.with_extension("rdb-uwal");
4178        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
4179
4180        rt.execute_query(
4181            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
4182        )
4183        .unwrap();
4184        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
4185            .unwrap();
4186
4187        assert_statement_writes_collections_in_one_new_wal_batch(
4188            &rt,
4189            &wal_path,
4190            "DELETE FROM users WHERE id = 1",
4191            "users",
4192            "user_deletes",
4193        );
4194    }
4195
4196    #[test]
4197    fn update_where_id_in_with_hash_index_updates_expected_rows() {
4198        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4199        rt.execute_query("CREATE TABLE users (id INT, score INT)")
4200            .unwrap();
4201        for id in 0..5 {
4202            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
4203                .unwrap();
4204        }
4205        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
4206            .unwrap();
4207
4208        let updated = rt
4209            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
4210            .unwrap();
4211        assert_eq!(updated.affected_rows, 3);
4212
4213        let selected = rt
4214            .execute_query("SELECT id, score FROM users ORDER BY id")
4215            .unwrap();
4216        let scores: Vec<(i64, i64)> = selected
4217            .result
4218            .records
4219            .iter()
4220            .map(|record| {
4221                let id = match record.get("id").unwrap() {
4222                    Value::Integer(value) => *value,
4223                    other => panic!("expected integer id, got {other:?}"),
4224                };
4225                let score = match record.get("score").unwrap() {
4226                    Value::Integer(value) => *value,
4227                    other => panic!("expected integer score, got {other:?}"),
4228                };
4229                (id, score)
4230            })
4231            .collect();
4232        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
4233    }
4234
4235    /// Drives UPDATE through the shared `DmlTargetScan` module — the
4236    /// same code path DELETE uses (#51, #52). Exercises the indexed
4237    /// equality fast-path (WHERE id = N with a HASH index), the
4238    /// unindexed range scan (WHERE score > N), and the no-WHERE
4239    /// full-scan branch to confirm the extracted "find target rows"
4240    /// loop preserves affected-row counts and the resulting row state.
4241    #[test]
4242    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4243        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4244        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4245            .unwrap();
4246        for id in 0..5 {
4247            rt.execute_query(&format!(
4248                "INSERT INTO items (id, score) VALUES ({id}, {})",
4249                id * 10
4250            ))
4251            .unwrap();
4252        }
4253        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4254            .unwrap();
4255
4256        // Indexed equality UPDATE — hits the hash fast-path inside
4257        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
4258        // below the score>25 cutoff so the next assertion stays clean.
4259        let updated_one = rt
4260            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
4261            .unwrap();
4262        assert_eq!(updated_one.affected_rows, 1);
4263
4264        // Unindexed scan UPDATE — bumps everyone with score > 25,
4265        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4266        // zoned/full-scan branch.
4267        let updated_many = rt
4268            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
4269            .unwrap();
4270        assert_eq!(updated_many.affected_rows, 2);
4271
4272        let snapshot = rt
4273            .execute_query("SELECT id, score FROM items ORDER BY id")
4274            .unwrap();
4275        let pairs: Vec<(i64, i64)> = snapshot
4276            .result
4277            .records
4278            .iter()
4279            .map(|record| {
4280                let id = match record.get("id").unwrap() {
4281                    Value::Integer(value) => *value,
4282                    other => panic!("expected integer id, got {other:?}"),
4283                };
4284                let score = match record.get("score").unwrap() {
4285                    Value::Integer(value) => *value,
4286                    other => panic!("expected integer score, got {other:?}"),
4287                };
4288                (id, score)
4289            })
4290            .collect();
4291        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
4292
4293        // Full-scan UPDATE with no WHERE rewrites every remaining row.
4294        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
4295        assert_eq!(updated_all.affected_rows, 5);
4296        let after = rt
4297            .execute_query("SELECT score FROM items ORDER BY id")
4298            .unwrap();
4299        let scores: Vec<i64> = after
4300            .result
4301            .records
4302            .iter()
4303            .map(|record| match record.get("score").unwrap() {
4304                Value::Integer(value) => *value,
4305                other => panic!("expected integer score, got {other:?}"),
4306            })
4307            .collect();
4308        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
4309    }
4310
4311    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
4312    /// both the index fast-path (WHERE id = N with a HASH index) and
4313    /// the unindexed scan path (WHERE score > N) to confirm the
4314    /// extracted "find target rows" loop preserves the affected-row
4315    /// count and which rows survive.
4316    #[test]
4317    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
4318        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4319        rt.execute_query("CREATE TABLE items (id INT, score INT)")
4320            .unwrap();
4321        for id in 0..5 {
4322            rt.execute_query(&format!(
4323                "INSERT INTO items (id, score) VALUES ({id}, {})",
4324                id * 10
4325            ))
4326            .unwrap();
4327        }
4328        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
4329            .unwrap();
4330
4331        // Indexed equality DELETE — hits the hash fast-path inside
4332        // DmlTargetScan::find_target_ids.
4333        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
4334        assert_eq!(deleted_one.affected_rows, 1);
4335
4336        // Unindexed scan DELETE — drops everyone with score > 25,
4337        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
4338        // zoned/full-scan branch.
4339        let deleted_many = rt
4340            .execute_query("DELETE FROM items WHERE score > 25")
4341            .unwrap();
4342        assert_eq!(deleted_many.affected_rows, 2);
4343
4344        let surviving = rt
4345            .execute_query("SELECT id FROM items ORDER BY id")
4346            .unwrap();
4347        let ids: Vec<i64> = surviving
4348            .result
4349            .records
4350            .iter()
4351            .map(|record| match record.get("id").unwrap() {
4352                Value::Integer(value) => *value,
4353                other => panic!("expected integer id, got {other:?}"),
4354            })
4355            .collect();
4356        assert_eq!(ids, vec![0, 1]);
4357
4358        // Sanity: full-scan DELETE with no WHERE clears the rest.
4359        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
4360        assert_eq!(deleted_rest.affected_rows, 2);
4361        let empty = rt.execute_query("SELECT id FROM items").unwrap();
4362        assert!(empty.result.records.is_empty());
4363    }
4364
4365    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
4366    /// INSERT but reject UPDATE and DELETE with the documented
4367    /// operator-facing error strings. Drives all three DML verbs so
4368    /// the centralized gate is exercised end-to-end.
4369    #[test]
4370    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
4371        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4372        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
4373            .unwrap();
4374
4375        // INSERT must succeed — APPEND ONLY exists precisely to allow
4376        // appends. The gate should be a no-op for INSERT.
4377        let inserted = rt
4378            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
4379            .unwrap();
4380        assert_eq!(inserted.affected_rows, 1);
4381
4382        // UPDATE is rejected with the gate's UPDATE-specific message.
4383        let update_err = rt
4384            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
4385            .unwrap_err();
4386        let msg = format!("{update_err}");
4387        assert!(
4388            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
4389            "expected UPDATE rejection message, got: {msg}"
4390        );
4391
4392        // DELETE is rejected with the gate's DELETE-specific message.
4393        let delete_err = rt
4394            .execute_query("DELETE FROM events WHERE id = 1")
4395            .unwrap_err();
4396        let msg = format!("{delete_err}");
4397        assert!(
4398            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
4399            "expected DELETE rejection message, got: {msg}"
4400        );
4401
4402        // Row should still be present — neither rejected mutation
4403        // touched storage.
4404        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
4405        assert_eq!(surviving.result.records.len(), 1);
4406    }
4407
4408    /// CollectionContract gate: tables without an APPEND ONLY contract
4409    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
4410    /// is a true pass-through, not an accidental block.
4411    #[test]
4412    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
4413        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4414        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
4415            .unwrap();
4416
4417        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
4418            .unwrap();
4419        let updated = rt
4420            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
4421            .unwrap();
4422        assert_eq!(updated.affected_rows, 1);
4423        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
4424        assert_eq!(deleted.affected_rows, 1);
4425    }
4426
4427    #[test]
4428    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
4429        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4430        rt.execute_query(
4431            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
4432        )
4433        .unwrap();
4434
4435        let inserted = rt
4436            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
4437            .unwrap();
4438        assert_eq!(inserted.affected_rows, 1);
4439
4440        let events = queue_payloads(&rt, "audit_log");
4441        assert_eq!(events.len(), 1);
4442        let event = events[0].as_object().expect("event payload object");
4443        assert!(event
4444            .get("event_id")
4445            .and_then(crate::json::Value::as_str)
4446            .is_some_and(|value| !value.is_empty()));
4447        assert_eq!(
4448            event.get("op").and_then(crate::json::Value::as_str),
4449            Some("insert")
4450        );
4451        assert_eq!(
4452            event.get("collection").and_then(crate::json::Value::as_str),
4453            Some("users")
4454        );
4455        assert_eq!(
4456            event.get("id").and_then(crate::json::Value::as_u64),
4457            Some(7)
4458        );
4459        assert!(event
4460            .get("ts")
4461            .and_then(crate::json::Value::as_u64)
4462            .is_some());
4463        assert!(event
4464            .get("lsn")
4465            .and_then(crate::json::Value::as_u64)
4466            .is_some());
4467        assert!(matches!(
4468            event.get("tenant"),
4469            Some(crate::json::Value::Null)
4470        ));
4471        assert!(matches!(
4472            event.get("before"),
4473            Some(crate::json::Value::Null)
4474        ));
4475        let after = event
4476            .get("after")
4477            .and_then(crate::json::Value::as_object)
4478            .expect("after object");
4479        assert_eq!(
4480            after.get("id").and_then(crate::json::Value::as_u64),
4481            Some(7)
4482        );
4483        assert_eq!(
4484            after.get("email").and_then(crate::json::Value::as_str),
4485            Some("a@example.com")
4486        );
4487    }
4488
4489    #[test]
4490    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
4491        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4492        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
4493            .unwrap();
4494
4495        rt.execute_query(
4496            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
4497        )
4498        .unwrap();
4499
4500        let events = queue_payloads(&rt, "users_events");
4501        assert_eq!(events.len(), 2);
4502        let mut previous_lsn = 0;
4503        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
4504            let object = event.as_object().expect("event payload object");
4505            assert_eq!(
4506                object.get("op").and_then(crate::json::Value::as_str),
4507                Some("insert")
4508            );
4509            assert_eq!(
4510                object.get("id").and_then(crate::json::Value::as_u64),
4511                Some(expected_id)
4512            );
4513            let lsn = object
4514                .get("lsn")
4515                .and_then(crate::json::Value::as_u64)
4516                .expect("event lsn");
4517            assert!(
4518                lsn > previous_lsn,
4519                "event LSNs should increase in row order"
4520            );
4521            previous_lsn = lsn;
4522            let after = object
4523                .get("after")
4524                .and_then(crate::json::Value::as_object)
4525                .expect("after object");
4526            assert_eq!(
4527                after.get("id").and_then(crate::json::Value::as_u64),
4528                Some(expected_id)
4529            );
4530        }
4531    }
4532
4533    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
4534        let result = rt
4535            .execute_query(&format!("QUEUE PEEK {queue} 10"))
4536            .expect("peek queue");
4537        result
4538            .result
4539            .records
4540            .iter()
4541            .map(
4542                |record| match record.get("payload").expect("payload column") {
4543                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4544                    other => panic!("expected JSON queue payload, got {other:?}"),
4545                },
4546            )
4547            .collect()
4548    }
4549
4550    // ── #112: auto-index user `id` on first insert ─────────────────────
4551
4552    /// First insert into a fresh collection that carries a column named
4553    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4554    /// populate it transparently, and `WHERE id = N` lookups exercise
4555    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4556    ///
4557    /// This is the load-bearing acceptance test for #112 — without the
4558    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4559    /// fall through to a full segment scan (the 4× perf gap documented
4560    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4561    #[test]
4562    fn auto_index_id_fires_on_first_insert() {
4563        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4564        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4565            .unwrap();
4566
4567        // Pre-condition: no index on `id` yet.
4568        assert!(
4569            rt.index_store_ref()
4570                .find_index_for_column("bench_users", "id")
4571                .is_none(),
4572            "freshly created collection should not have an `id` index"
4573        );
4574
4575        // Single-row INSERT — drives `MutationEngine::append_one`.
4576        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4577            .unwrap();
4578
4579        // Post-condition: hash index registered on `id`.
4580        let registered = rt
4581            .index_store_ref()
4582            .find_index_for_column("bench_users", "id")
4583            .expect("auto-index hook should have registered idx_id on first insert");
4584        assert_eq!(registered.name, "idx_id");
4585        assert_eq!(registered.collection, "bench_users");
4586        assert_eq!(registered.columns, vec!["id".to_string()]);
4587        assert!(matches!(
4588            registered.method,
4589            super::super::index_store::IndexMethodKind::Hash
4590        ));
4591
4592        // Subsequent inserts populate the index; `WHERE id = N` should
4593        // resolve via the hash fast path and round-trip every row.
4594        for id in 2..=5 {
4595            rt.execute_query(&format!(
4596                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4597                id * 10
4598            ))
4599            .unwrap();
4600        }
4601        for id in 1..=5 {
4602            let result = rt
4603                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4604                .unwrap();
4605            assert_eq!(
4606                result.result.records.len(),
4607                1,
4608                "id={id} should match one row"
4609            );
4610        }
4611
4612        // Delete via the hash fast-path — exactly the bench scenario the
4613        // perf doc identified as the 4× regression. With the index
4614        // present, `find_target_ids` short-circuits before
4615        // `for_each_entity_zoned` runs.
4616        let deleted = rt
4617            .execute_query("DELETE FROM bench_users WHERE id = 3")
4618            .unwrap();
4619        assert_eq!(deleted.affected_rows, 1);
4620    }
4621
4622    /// Bulk INSERT (the multi-row VALUES path) drives
4623    /// `MutationEngine::append_batch`. The hook must fire there too —
4624    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4625    /// wire bulk INSERT) skip auto-indexing entirely.
4626    #[test]
4627    fn auto_index_id_fires_on_first_bulk_insert() {
4628        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4629        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4630            .unwrap();
4631
4632        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4633            .unwrap();
4634
4635        let registered = rt
4636            .index_store_ref()
4637            .find_index_for_column("bench_bulk", "id")
4638            .expect("auto-index hook should fire on first bulk insert");
4639        assert_eq!(registered.name, "idx_id");
4640
4641        // Every row populated via `index_entity_insert_batch`.
4642        for id in 1..=3 {
4643            let result = rt
4644                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4645                .unwrap();
4646            assert_eq!(result.result.records.len(), 1);
4647        }
4648    }
4649
4650    /// Hook is a no-op when the row carries no `id` column. Conservative
4651    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4652    /// don't trigger it.
4653    #[test]
4654    fn auto_index_id_skips_when_no_id_column() {
4655        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4656        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4657            .unwrap();
4658        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4659            .unwrap();
4660
4661        assert!(rt
4662            .index_store_ref()
4663            .find_index_for_column("plain", "id")
4664            .is_none());
4665        assert!(rt
4666            .index_store_ref()
4667            .find_index_for_column("plain", "uid")
4668            .is_none());
4669    }
4670
4671    /// Hook only fires once per collection. If an explicit
4672    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4673    /// detects it via `find_index_for_column` and does NOT clobber it
4674    /// with a HASH index on the next insert.
4675    #[test]
4676    fn auto_index_id_skips_when_index_already_exists() {
4677        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4678        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4679            .unwrap();
4680        // User-declared BTREE index on `id` before any insert.
4681        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4682            .unwrap();
4683        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4684            .unwrap();
4685
4686        let registered = rt
4687            .index_store_ref()
4688            .find_index_for_column("pre", "id")
4689            .expect("user index should still be there");
4690        assert_eq!(
4691            registered.name, "user_idx",
4692            "auto-index hook must not overwrite an existing index"
4693        );
4694    }
4695
4696    /// Implicit `idx_id` is reaped when the collection drops. The
4697    /// existing `execute_drop_table` walks `list_indices` and drops every
4698    /// entry — confirm the auto-created index participates.
4699    #[test]
4700    fn auto_index_id_dropped_with_collection() {
4701        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4702        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4703            .unwrap();
4704        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4705            .unwrap();
4706        assert!(rt
4707            .index_store_ref()
4708            .find_index_for_column("ephemeral", "id")
4709            .is_some());
4710
4711        rt.execute_query("DROP TABLE ephemeral").unwrap();
4712
4713        assert!(
4714            rt.index_store_ref()
4715                .find_index_for_column("ephemeral", "id")
4716                .is_none(),
4717            "implicit `idx_id` must be reaped when its collection drops"
4718        );
4719    }
4720
4721    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4722    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4723    /// off, first insert leaves the collection without an `id` index —
4724    /// DELETE/UPDATE fall back to the scan path.
4725    #[test]
4726    fn auto_index_id_disabled_by_config() {
4727        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4728        let rt = RedDBRuntime::with_options(opts).unwrap();
4729
4730        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4731            .unwrap();
4732        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4733            .unwrap();
4734
4735        assert!(
4736            rt.index_store_ref()
4737                .find_index_for_column("off", "id")
4738                .is_none(),
4739            "with auto_index_id=false, no implicit index should be created"
4740        );
4741    }
4742
4743    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4744
4745    #[test]
4746    fn update_single_row_emits_update_event() {
4747        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4748        rt.execute_query(
4749            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4750        )
4751        .unwrap();
4752        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4753            .unwrap();
4754
4755        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4756            .unwrap();
4757
4758        let events = queue_payloads(&rt, "audit_log");
4759        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4760        let event = events[0].as_object().expect("event payload object");
4761        assert_eq!(
4762            event.get("op").and_then(crate::json::Value::as_str),
4763            Some("update")
4764        );
4765        assert_eq!(
4766            event.get("collection").and_then(crate::json::Value::as_str),
4767            Some("users")
4768        );
4769        assert!(event
4770            .get("event_id")
4771            .and_then(crate::json::Value::as_str)
4772            .is_some_and(|v| !v.is_empty()));
4773        let before = event
4774            .get("before")
4775            .and_then(crate::json::Value::as_object)
4776            .expect("before must be an object");
4777        let after = event
4778            .get("after")
4779            .and_then(crate::json::Value::as_object)
4780            .expect("after must be an object");
4781        assert_eq!(
4782            before.get("name").and_then(crate::json::Value::as_str),
4783            Some("Alice"),
4784            "before.name should be the old value"
4785        );
4786        assert_eq!(
4787            after.get("name").and_then(crate::json::Value::as_str),
4788            Some("Bob"),
4789            "after.name should be the new value"
4790        );
4791    }
4792
4793    #[test]
4794    fn update_event_only_includes_changed_fields() {
4795        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4796        rt.execute_query(
4797            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4798        )
4799        .unwrap();
4800        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4801            .unwrap();
4802
4803        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4804            .unwrap();
4805
4806        let events = queue_payloads(&rt, "evts");
4807        assert_eq!(events.len(), 1);
4808        let event = events[0].as_object().unwrap();
4809        let before = event
4810            .get("before")
4811            .and_then(crate::json::Value::as_object)
4812            .unwrap();
4813        let after = event
4814            .get("after")
4815            .and_then(crate::json::Value::as_object)
4816            .unwrap();
4817        // Only changed field included.
4818        assert!(
4819            before.contains_key("name"),
4820            "before must include changed field"
4821        );
4822        assert!(
4823            after.contains_key("name"),
4824            "after must include changed field"
4825        );
4826        // Unchanged fields must not appear.
4827        assert!(
4828            !before.contains_key("email"),
4829            "before must not include unchanged email"
4830        );
4831        assert!(
4832            !after.contains_key("email"),
4833            "after must not include unchanged email"
4834        );
4835    }
4836
4837    #[test]
4838    fn multi_row_update_emits_one_event_per_row() {
4839        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4840        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4841            .unwrap();
4842        rt.execute_query(
4843            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4844        )
4845        .unwrap();
4846
4847        rt.execute_query("UPDATE items SET status = 'done'")
4848            .unwrap();
4849
4850        let events = queue_payloads(&rt, "evts");
4851        assert_eq!(events.len(), 3, "expected one update event per row");
4852        for event in &events {
4853            let obj = event.as_object().unwrap();
4854            assert_eq!(
4855                obj.get("op").and_then(crate::json::Value::as_str),
4856                Some("update")
4857            );
4858        }
4859    }
4860
4861    #[test]
4862    fn delete_single_row_emits_delete_event() {
4863        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4864        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4865            .unwrap();
4866        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4867            .unwrap();
4868
4869        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4870
4871        let events = queue_payloads(&rt, "del_log");
4872        assert_eq!(events.len(), 1);
4873        let event = events[0].as_object().expect("event payload object");
4874        assert_eq!(
4875            event.get("op").and_then(crate::json::Value::as_str),
4876            Some("delete")
4877        );
4878        assert_eq!(
4879            event.get("collection").and_then(crate::json::Value::as_str),
4880            Some("users")
4881        );
4882        assert!(event
4883            .get("event_id")
4884            .and_then(crate::json::Value::as_str)
4885            .is_some_and(|v| !v.is_empty()));
4886        let before = event
4887            .get("before")
4888            .and_then(crate::json::Value::as_object)
4889            .expect("before must be an object for delete");
4890        assert_eq!(
4891            before.get("id").and_then(crate::json::Value::as_u64),
4892            Some(42)
4893        );
4894        assert_eq!(
4895            before.get("name").and_then(crate::json::Value::as_str),
4896            Some("Alice")
4897        );
4898        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4899    }
4900
4901    #[test]
4902    fn multi_row_delete_emits_one_event_per_row() {
4903        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4904        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4905            .unwrap();
4906        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4907            .unwrap();
4908
4909        rt.execute_query("DELETE FROM items").unwrap();
4910
4911        let events = queue_payloads(&rt, "del_log");
4912        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4913        for event in &events {
4914            let obj = event.as_object().unwrap();
4915            assert_eq!(
4916                obj.get("op").and_then(crate::json::Value::as_str),
4917                Some("delete")
4918            );
4919            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4920        }
4921    }
4922
4923    #[test]
4924    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4925        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4926        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4927            .unwrap();
4928
4929        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4930            .unwrap();
4931        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4932
4933        let events = queue_payloads(&rt, "evts");
4934        assert!(
4935            events.is_empty(),
4936            "UPDATE-only filter must not emit INSERT or DELETE events"
4937        );
4938    }
4939
4940    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
4941
4942    #[test]
4943    fn suppress_events_on_insert_emits_no_events() {
4944        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4945        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4946            .unwrap();
4947
4948        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4949            .unwrap();
4950
4951        let events = queue_payloads(&rt, "evts");
4952        assert!(
4953            events.is_empty(),
4954            "SUPPRESS EVENTS must prevent INSERT events"
4955        );
4956    }
4957
4958    #[test]
4959    fn suppress_events_on_update_emits_no_events() {
4960        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4961        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4962            .unwrap();
4963        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4964            .unwrap();
4965        // drain the INSERT event
4966        let _ = queue_payloads(&rt, "evts");
4967        // Force pop to drain; simpler: just check new count after UPDATE
4968        rt.execute_query("QUEUE PURGE evts").unwrap();
4969
4970        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4971            .unwrap();
4972
4973        let events = queue_payloads(&rt, "evts");
4974        assert!(
4975            events.is_empty(),
4976            "SUPPRESS EVENTS must prevent UPDATE events"
4977        );
4978    }
4979
4980    #[test]
4981    fn suppress_events_on_delete_emits_no_events() {
4982        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4983        rt.execute_query(
4984            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4985        )
4986        .unwrap();
4987        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4988            .unwrap();
4989
4990        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4991            .unwrap();
4992
4993        let events = queue_payloads(&rt, "evts");
4994        assert!(
4995            events.is_empty(),
4996            "SUPPRESS EVENTS must prevent DELETE events"
4997        );
4998    }
4999
5000    #[test]
5001    fn normal_insert_after_suppress_still_emits() {
5002        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
5003        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
5004            .unwrap();
5005
5006        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
5007            .unwrap();
5008        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
5009            .unwrap();
5010
5011        let events = queue_payloads(&rt, "evts");
5012        assert_eq!(
5013            events.len(),
5014            1,
5015            "only the non-suppressed INSERT should emit"
5016        );
5017        assert_eq!(
5018            events[0].get("id").and_then(crate::json::Value::as_u64),
5019            Some(2)
5020        );
5021    }
5022}