Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput,
11    CreateEntityOutput, CreateKvInput, CreateNodeInput, CreateRowInput, CreateRowsBatchInput,
12    CreateVectorInput, DeleteEntityInput, PatchEntityOperation, PatchEntityOperationType,
13    RowUpdateColumnRule, RowUpdateContractPlan,
14};
15use crate::application::ports::{
16    build_row_update_contract_plan, entity_row_fields_snapshot,
17    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
18    RuntimeEntityPort,
19};
20use crate::application::ttl_payload::has_internal_ttl_metadata;
21use crate::presentation::entity_json::storage_value_to_json;
22use crate::storage::query::ast::{BinOp, Expr, FieldRef, ReturningItem, UpdateTarget};
23use crate::storage::query::sql_lowering::{
24    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
25};
26use crate::storage::query::unified::{
27    sys_key_collection, sys_key_created_at, sys_key_kind, sys_key_red_entity_id, sys_key_rid,
28    sys_key_tenant, sys_key_updated_at, UnifiedRecord, UnifiedResult,
29};
30use crate::storage::unified::MetadataValue;
31use crate::storage::Metadata;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use super::*;
36
37const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
38const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
39const TREE_METADATA_PREFIX: &str = "red.tree.";
40
41#[derive(Clone)]
42struct CompiledUpdateAssignment {
43    column: String,
44    expr: Expr,
45    compound_op: Option<BinOp>,
46    metadata_key: Option<&'static str>,
47    row_rule: Option<RowUpdateColumnRule>,
48}
49
50struct CompiledUpdatePlan {
51    static_field_assignments: Vec<(String, Value)>,
52    static_metadata_assignments: Vec<(String, MetadataValue)>,
53    dynamic_assignments: Vec<CompiledUpdateAssignment>,
54    row_contract_plan: Option<RowUpdateContractPlan>,
55    row_modified_columns: Vec<String>,
56    row_touches_unique_columns: bool,
57}
58
59#[derive(Default)]
60struct MaterializedUpdateAssignments {
61    dynamic_field_assignments: Vec<(String, Value)>,
62    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
63}
64
65impl RedDBRuntime {
66    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
67    /// target table is tenant-scoped and the user's column list does
68    /// not already name the tenant column.
69    ///
70    /// Returns:
71    /// * `Ok(None)` — no injection needed (non-tenant table, or user
72    ///   supplied the column explicitly). Caller uses the original
73    ///   query unchanged.
74    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
75    ///   + literal value appended to every row.
76    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
77    ///   the current session. Fails loudly so callers don't produce
78    ///   rows that RLS would then hide on read.
79    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
80        let Some(tenant_col) = self.tenant_column(&query.table) else {
81            return Ok(None);
82        };
83        // User already named the column (literal match) — trust them.
84        if query
85            .columns
86            .iter()
87            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
88        {
89            return Ok(None);
90        }
91
92        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
93        // nested key like `headers.tenant` we operate on the root
94        // column (`headers`) and set / add the nested path inside its
95        // JSON value. If the user named the root column we mutate in
96        // place; otherwise we create a fresh JSON column for every row.
97        if let Some(dot_pos) = tenant_col.find('.') {
98            let (root, tail) = tenant_col.split_at(dot_pos);
99            let tail = &tail[1..]; // drop leading '.'
100            return self.inject_dotted_tenant(query, root, tail);
101        }
102
103        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
104            return Err(RedDBError::Query(format!(
105                "INSERT into tenant-scoped table '{}' requires an active tenant — \
106                 run SET TENANT '<id>' first or name column '{}' explicitly",
107                query.table, tenant_col
108            )));
109        };
110
111        let mut augmented = query.clone();
112        augmented.columns.push(tenant_col);
113        let lit = Value::text(tenant_id.clone());
114        for row in augmented.values.iter_mut() {
115            row.push(lit.clone());
116        }
117        for row in augmented.value_exprs.iter_mut() {
118            row.push(crate::storage::query::ast::Expr::Literal {
119                value: lit.clone(),
120                span: crate::storage::query::ast::Span::synthetic(),
121            });
122        }
123        Ok(Some(augmented))
124    }
125
126    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
127    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
128    /// nested JSON instead of appending a flat column.
129    ///
130    /// Cases:
131    /// * Root column already in the INSERT list → mutate per-row JSON
132    ///   (parse, set path, re-serialize).
133    /// * Root column absent → create a fresh `{tail: tenant}` JSON
134    ///   object and append the root column to the INSERT.
135    fn inject_dotted_tenant(
136        &self,
137        query: &InsertQuery,
138        root: &str,
139        tail: &str,
140    ) -> RedDBResult<Option<InsertQuery>> {
141        let active_tenant = crate::runtime::impl_core::current_tenant();
142        let mut augmented = query.clone();
143        let root_idx = augmented
144            .columns
145            .iter()
146            .position(|c| c.eq_ignore_ascii_case(root));
147
148        if let Some(idx) = root_idx {
149            // User supplied the root column. Per-row: if the dotted
150            // tail is already present we trust the user (admin / bulk
151            // loader scenario); otherwise fill from the active
152            // tenant. An unbound tenant is only an error when some
153            // row actually needs filling.
154            for row in augmented.values.iter_mut() {
155                let Some(slot) = row.get_mut(idx) else {
156                    continue;
157                };
158                if dotted_tail_already_set(slot, tail) {
159                    continue;
160                }
161                let Some(tenant_id) = &active_tenant else {
162                    return Err(RedDBError::Query(format!(
163                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
164                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
165                        query.table, root, tail
166                    )));
167                };
168                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
169            }
170            // Expression row is kept in sync by re-wrapping the
171            // mutated literal; the canonical path will re-evaluate
172            // against the same JSON shape.
173            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
174                if let Some(slot) = row.get_mut(idx) {
175                    let new_value = augmented
176                        .values
177                        .get(row_idx)
178                        .and_then(|v| v.get(idx))
179                        .cloned()
180                        .unwrap_or(Value::Null);
181                    *slot = crate::storage::query::ast::Expr::Literal {
182                        value: new_value,
183                        span: crate::storage::query::ast::Span::synthetic(),
184                    };
185                }
186            }
187        } else {
188            // No root column in the INSERT list — auto-fill needs a
189            // bound tenant to synthesise one. Error loud so we never
190            // create a tenant-less row that RLS would then hide.
191            let Some(tenant_id) = &active_tenant else {
192                return Err(RedDBError::Query(format!(
193                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
194                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
195                    query.table, root, tail
196                )));
197            };
198            // Create a fresh JSON column with only the tenant path set.
199            augmented.columns.push(root.to_string());
200            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
201            for row in augmented.values.iter_mut() {
202                row.push(fresh.clone());
203            }
204            for row in augmented.value_exprs.iter_mut() {
205                row.push(crate::storage::query::ast::Expr::Literal {
206                    value: fresh.clone(),
207                    span: crate::storage::query::ast::Span::synthetic(),
208                });
209            }
210        }
211
212        Ok(Some(augmented))
213    }
214
215    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
216    /// `lsns` is empty because events fire at commit time.
217    fn delete_entities_batch(
218        &self,
219        collection: &str,
220        ids: &[EntityId],
221    ) -> RedDBResult<(u64, Vec<u64>)> {
222        if ids.is_empty() {
223            return Ok((0, vec![]));
224        }
225
226        let store = self.db().store();
227        let Some(manager) = store.get_collection(collection) else {
228            return Ok((0, vec![]));
229        };
230
231        let active_xid = self.current_xid();
232        let conn_id = crate::runtime::impl_core::current_connection_id();
233        let mut autocommit_xid = None;
234        let mut tombstoned_ids = Vec::new();
235        let mut tombstoned_entities = Vec::new();
236        let mut physical_delete_ids = Vec::new();
237        let table_row_resolver =
238            crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement();
239
240        for &id in ids {
241            let Some(mut entity) = manager.get(id) else {
242                continue;
243            };
244            if matches!(entity.data, EntityData::Row(_)) {
245                let previous_xmax = entity.xmax;
246                if matches!(entity.kind, crate::storage::EntityKind::TableRow { .. }) {
247                    if table_row_resolver.resolve_candidate(&entity).is_none() {
248                        continue;
249                    }
250                } else if entity.xmax != 0 {
251                    continue;
252                }
253
254                let xid = match active_xid {
255                    Some(xid) => xid,
256                    None => match autocommit_xid {
257                        Some(xid) => xid,
258                        None => {
259                            let mgr = self.snapshot_manager();
260                            let xid = mgr.begin();
261                            autocommit_xid = Some(xid);
262                            xid
263                        }
264                    },
265                };
266                entity.set_xmax(xid);
267                if manager.update(entity.clone()).is_ok() {
268                    if active_xid.is_some() {
269                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
270                    }
271                    tombstoned_entities.push(entity);
272                    tombstoned_ids.push(id);
273                }
274            } else {
275                physical_delete_ids.push(id);
276            }
277        }
278
279        if let Some(xid) = autocommit_xid {
280            self.snapshot_manager().commit(xid);
281        }
282
283        let mut affected = tombstoned_ids.len() as u64;
284        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
285        if active_xid.is_some() {
286            store
287                .persist_entities_to_pager(collection, &tombstoned_entities)
288                .map_err(|err| RedDBError::Internal(err.to_string()))?;
289        } else {
290            store
291                .persist_entities_to_pager(collection, &tombstoned_entities)
292                .map_err(|err| RedDBError::Internal(err.to_string()))?;
293            for id in &tombstoned_ids {
294                store.context_index().remove_entity(*id);
295                let lsn = self.cdc_emit(
296                    crate::replication::cdc::ChangeOperation::Delete,
297                    collection,
298                    id.raw(),
299                    "entity",
300                );
301                lsns.push(lsn);
302            }
303        }
304
305        let deleted_ids = store
306            .delete_batch(collection, &physical_delete_ids)
307            .map_err(|err| RedDBError::Internal(err.to_string()))?;
308        affected += deleted_ids.len() as u64;
309        for id in &deleted_ids {
310            store.context_index().remove_entity(*id);
311            let lsn = self.cdc_emit(
312                crate::replication::cdc::ChangeOperation::Delete,
313                collection,
314                id.raw(),
315                "entity",
316            );
317            lsns.push(lsn);
318        }
319
320        Ok((affected, lsns))
321    }
322
323    /// Flushes context-index updates and CDC for each applied mutation.
324    /// Returns one LSN per entity in the same order as `applied`.
325    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
326        if applied.is_empty() {
327            return Ok(Vec::new());
328        }
329
330        let store = self.db().store();
331        if applied.iter().any(|item| item.context_index_dirty) {
332            store.context_index().index_entities(
333                &applied[0].collection,
334                applied
335                    .iter()
336                    .filter(|item| item.context_index_dirty)
337                    .map(|item| &item.entity),
338            );
339        }
340
341        for item in applied {
342            self.refresh_update_secondary_indexes(item)?;
343        }
344
345        let mut lsns = Vec::with_capacity(applied.len());
346        for item in applied {
347            let lsn = self.cdc_emit_prebuilt(
348                crate::replication::cdc::ChangeOperation::Update,
349                &item.collection,
350                &item.entity,
351                update_cdc_item_kind(self, &item.collection, &item.entity),
352                item.metadata.as_ref(),
353                false,
354            );
355            lsns.push(lsn);
356        }
357        Ok(lsns)
358    }
359
360    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
361        self.persist_applied_entity_mutations(applied)
362    }
363
364    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
365        if applied.pre_mutation_fields.is_empty() {
366            return Ok(());
367        }
368        let post = entity_row_fields_snapshot(&applied.entity);
369        if post.is_empty() {
370            return Ok(());
371        }
372
373        let indexed_cols = self
374            .index_store_ref()
375            .indexed_columns_set(&applied.collection);
376        if indexed_cols.is_empty() {
377            return Ok(());
378        }
379
380        if let Some(old_version) = applied.replaced_entity.as_ref() {
381            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
382                .pre_mutation_fields
383                .iter()
384                .filter(|(col, _)| indexed_cols.contains(col))
385                .cloned()
386                .collect();
387            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
388                .iter()
389                .filter(|(col, _)| indexed_cols.contains(col))
390                .cloned()
391                .collect();
392            if !old_index_fields.is_empty() {
393                self.index_store_ref()
394                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
395                    .map_err(crate::RedDBError::Internal)?;
396            }
397            if !new_index_fields.is_empty() {
398                self.index_store_ref()
399                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
400                    .map_err(crate::RedDBError::Internal)?;
401            }
402            return Ok(());
403        }
404
405        let damage =
406            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
407        if damage
408            .touched_columns()
409            .into_iter()
410            .any(|col| indexed_cols.contains(col))
411        {
412            self.index_store_ref()
413                .index_entity_update(
414                    &applied.collection,
415                    applied.id,
416                    &applied.pre_mutation_fields,
417                    &post,
418                )
419                .map_err(crate::RedDBError::Internal)?;
420        }
421        Ok(())
422    }
423
424    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
425    ///
426    /// Each row in `query.values` is zipped with `query.columns` to produce a
427    /// set of named fields, which is then dispatched based on entity_type.
428    pub fn execute_insert(
429        &self,
430        raw_query: &str,
431        query: &InsertQuery,
432    ) -> RedDBResult<RuntimeQueryResult> {
433        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
434        // CollectionContract gate (#49): single entry point for the
435        // operator's collection-level write rules. Today this is a
436        // no-op for INSERT (APPEND ONLY permits insert); routing
437        // through the gate now means future contract bits — versioned,
438        // vault-only writes — plug in once instead of per verb.
439        crate::runtime::collection_contract::CollectionContractGate::check(
440            self,
441            &query.table,
442            crate::runtime::collection_contract::MutationKind::Insert,
443        )?;
444        // Phase 2.5.4 table-scoped tenancy: if the target table is
445        // tenant-scoped and the user didn't name the tenant column,
446        // auto-inject it with the thread-local `CURRENT_TENANT()`
447        // value. When the column is named explicitly we trust the
448        // caller (useful for admin tooling that writes on behalf of
449        // specific tenants). An unbound tenant on an implicit-fill
450        // path errors up front rather than producing a row the RLS
451        // policy would silently hide.
452        let augmented_owned;
453        let query = match self.maybe_inject_tenant_column(query)? {
454            Some(new_q) => {
455                augmented_owned = new_q;
456                &augmented_owned
457            }
458            None => query,
459        };
460        self.check_insert_column_policy(query)?;
461
462        let mut inserted_count: u64 = 0;
463        let effective_rows =
464            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
465
466        // Ensure the collection exists (auto-create on first insert).
467        let store = self.inner.db.store();
468        let _ = store.get_or_create_collection(&query.table);
469        let declared_model = self
470            .db()
471            .collection_contract_arc(&query.table)
472            .map(|contract| contract.declared_model);
473
474        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
475            if query.returning.is_some() {
476                Some(Vec::with_capacity(effective_rows.len()))
477            } else {
478                None
479            };
480        let mut returning_result: Option<UnifiedResult> = None;
481
482        if matches!(query.entity_type, InsertEntityType::Row)
483            && !matches!(
484                declared_model,
485                Some(crate::catalog::CollectionModel::TimeSeries)
486            )
487        {
488            let mut rows = Vec::with_capacity(effective_rows.len());
489            for row_values in &effective_rows {
490                if row_values.len() != query.columns.len() {
491                    return Err(RedDBError::Query(format!(
492                        "INSERT column count ({}) does not match value count ({})",
493                        query.columns.len(),
494                        row_values.len()
495                    )));
496                }
497                let (fields, mut metadata) =
498                    split_insert_metadata(self, &query.columns, row_values)?;
499                merge_with_clauses(
500                    &mut metadata,
501                    query.ttl_ms,
502                    query.expires_at_ms,
503                    &query.with_metadata,
504                );
505                if let Some(snaps) = returning_snapshots.as_mut() {
506                    snaps.push(fields.clone());
507                }
508                rows.push(CreateRowInput {
509                    collection: query.table.clone(),
510                    fields,
511                    metadata,
512                    node_links: Vec::new(),
513                    vector_links: Vec::new(),
514                });
515            }
516            let outputs = self.create_rows_batch(CreateRowsBatchInput {
517                collection: query.table.clone(),
518                rows,
519                suppress_events: query.suppress_events,
520            })?;
521            inserted_count = outputs.len() as u64;
522
523            // Hypertable chunk routing: if this table was declared via
524            // CREATE HYPERTABLE, register each row's time-column value
525            // with the registry so chunk metadata (bounds, row counts,
526            // TTL eligibility) stays current. This is what lets
527            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
528            // retention daemon sweep expired chunks without scanning
529            // every row.
530            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
531                let time_col = &spec.time_column;
532                // Find the column's index in the INSERT column list.
533                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
534                    for row in &effective_rows {
535                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
536                            if *n >= 0 {
537                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
538                            }
539                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
540                            let _ = self.inner.db.hypertables().route(&query.table, *n);
541                        }
542                    }
543                }
544            }
545
546            if let (Some(items), Some(snaps)) =
547                (query.returning.as_ref(), returning_snapshots.take())
548            {
549                let snaps = row_insert_returning_snapshots(&outputs, snaps);
550                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
551            }
552        } else {
553            // Issue #419: surface the inserted entity id on every INSERT path.
554            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
555            // so a RETURNING clause (and the unconditional inserted_ids list,
556            // below) can expose the engine-assigned id. TimeSeries (the row
557            // branch in this else) still returns the not-supported error
558            // because create_timeseries_point isn't plumbed through this fn.
559            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
560                Vec::with_capacity(effective_rows.len());
561            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
562            {
563                Vec::with_capacity(effective_rows.len())
564            } else {
565                Vec::new()
566            };
567            if matches!(
568                query.entity_type,
569                InsertEntityType::Node | InsertEntityType::Edge
570            ) {
571                enum PreparedGraphInsert {
572                    Node {
573                        fields: Vec<(String, Value)>,
574                        input: CreateNodeInput,
575                    },
576                    Edge {
577                        fields: Vec<(String, Value)>,
578                        input: CreateEdgeInput,
579                    },
580                }
581
582                let mut prepared = Vec::with_capacity(effective_rows.len());
583                for row_values in &effective_rows {
584                    if row_values.len() != query.columns.len() {
585                        return Err(RedDBError::Query(format!(
586                            "INSERT column count ({}) does not match value count ({})",
587                            query.columns.len(),
588                            row_values.len()
589                        )));
590                    }
591
592                    match query.entity_type {
593                        InsertEntityType::Node => {
594                            let (node_values, mut metadata) =
595                                split_insert_metadata(self, &query.columns, row_values)?;
596                            merge_with_clauses(
597                                &mut metadata,
598                                query.ttl_ms,
599                                query.expires_at_ms,
600                                &query.with_metadata,
601                            );
602                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
603                            apply_collection_default_ttl_metadata(
604                                self,
605                                &query.table,
606                                &mut metadata,
607                            );
608                            let (columns, values) = pairwise_columns_values(&node_values);
609                            let label = find_column_value_string(&columns, &values, "label")?;
610                            let node_type =
611                                find_column_value_opt_string(&columns, &values, "node_type");
612                            let properties = extract_remaining_properties(
613                                &columns,
614                                &values,
615                                &["label", "node_type"],
616                            );
617                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
618                                properties.iter().map(|(key, _)| key.as_str()),
619                                &format!("node '{}'", query.table),
620                            )?;
621                            prepared.push(PreparedGraphInsert::Node {
622                                fields: node_values,
623                                input: CreateNodeInput {
624                                    collection: query.table.clone(),
625                                    label,
626                                    node_type,
627                                    properties,
628                                    metadata,
629                                    embeddings: Vec::new(),
630                                    table_links: Vec::new(),
631                                    node_links: Vec::new(),
632                                },
633                            });
634                        }
635                        InsertEntityType::Edge => {
636                            let (edge_values, mut metadata) =
637                                split_insert_metadata(self, &query.columns, row_values)?;
638                            merge_with_clauses(
639                                &mut metadata,
640                                query.ttl_ms,
641                                query.expires_at_ms,
642                                &query.with_metadata,
643                            );
644                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
645                            apply_collection_default_ttl_metadata(
646                                self,
647                                &query.table,
648                                &mut metadata,
649                            );
650                            let (columns, values) = pairwise_columns_values(&edge_values);
651                            let label = find_column_value_string(&columns, &values, "label")?;
652                            ensure_non_tree_structural_edge_label(&label)?;
653                            let from_id = resolve_edge_endpoint_any(
654                                self.inner.db.store().as_ref(),
655                                &query.table,
656                                &columns,
657                                &values,
658                                &["from_rid", "from"],
659                            )?;
660                            let to_id = resolve_edge_endpoint_any(
661                                self.inner.db.store().as_ref(),
662                                &query.table,
663                                &columns,
664                                &values,
665                                &["to_rid", "to"],
666                            )?;
667                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
668                            let properties = extract_remaining_properties(
669                                &columns,
670                                &values,
671                                &["label", "from_rid", "to_rid", "from", "to", "weight"],
672                            );
673                            crate::reserved_fields::ensure_no_reserved_public_item_fields(
674                                properties.iter().map(|(key, _)| key.as_str()),
675                                &format!("edge '{}'", query.table),
676                            )?;
677                            prepared.push(PreparedGraphInsert::Edge {
678                                fields: edge_values,
679                                input: CreateEdgeInput {
680                                    collection: query.table.clone(),
681                                    label,
682                                    from: EntityId::new(from_id),
683                                    to: EntityId::new(to_id),
684                                    weight,
685                                    properties,
686                                    metadata,
687                                },
688                            });
689                        }
690                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
691                    }
692                }
693
694                ensure_graph_insert_contract(self, &query.table)?;
695                let mut batch = self.inner.db.batch();
696                for item in prepared {
697                    match item {
698                        PreparedGraphInsert::Node { fields, input } => {
699                            if query.returning.is_some() {
700                                returning_field_snaps.push(fields);
701                            }
702                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
703                            batch = batch.add_node_with_type(
704                                input.collection,
705                                input.label,
706                                node_type,
707                                input.properties.into_iter().collect(),
708                                input.metadata.into_iter().collect(),
709                            );
710                        }
711                        PreparedGraphInsert::Edge { fields, input } => {
712                            if query.returning.is_some() {
713                                returning_field_snaps.push(fields);
714                            }
715                            batch = batch.add_edge(
716                                input.collection,
717                                input.label,
718                                input.from,
719                                input.to,
720                                input.weight.unwrap_or(1.0),
721                                input.properties.into_iter().collect(),
722                                input.metadata.into_iter().collect(),
723                            );
724                        }
725                    }
726                }
727                let batch_result = batch
728                    .execute()
729                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
730                let (ids, entity_kind) = match query.entity_type {
731                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
732                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
733                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
734                };
735                for id in &ids {
736                    self.stamp_xmin_if_in_txn(&query.table, *id);
737                }
738                if query.returning.is_some() {
739                    returning_field_snaps = graph_insert_returning_snapshots(
740                        self.inner.db.store().as_ref(),
741                        &query.table,
742                        &ids,
743                    );
744                }
745                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
746                let store = self.inner.db.store();
747                entity_outputs.extend(ids.iter().map(|id| {
748                    crate::application::entity::CreateEntityOutput {
749                        id: *id,
750                        entity: store.get(&query.table, *id),
751                    }
752                }));
753                inserted_count = ids.len() as u64;
754            } else {
755                for row_values in &effective_rows {
756                    if row_values.len() != query.columns.len() {
757                        return Err(RedDBError::Query(format!(
758                            "INSERT column count ({}) does not match value count ({})",
759                            query.columns.len(),
760                            row_values.len()
761                        )));
762                    }
763
764                    match query.entity_type {
765                        InsertEntityType::Row => {
766                            if query.returning.is_some() {
767                                return Err(RedDBError::Query(
768                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
769                                    .to_string(),
770                            ));
771                            }
772                            let (fields, mut metadata) =
773                                split_insert_metadata(self, &query.columns, row_values)?;
774                            merge_with_clauses(
775                                &mut metadata,
776                                query.ttl_ms,
777                                query.expires_at_ms,
778                                &query.with_metadata,
779                            );
780                            self.insert_timeseries_point(&query.table, fields, metadata)?;
781                        }
782                        InsertEntityType::Node | InsertEntityType::Edge => {
783                            unreachable!("NODE and EDGE are handled by the prepared graph path")
784                        }
785                        InsertEntityType::Vector => {
786                            let (vector_values, mut metadata) =
787                                split_insert_metadata(self, &query.columns, row_values)?;
788                            merge_with_clauses(
789                                &mut metadata,
790                                query.ttl_ms,
791                                query.expires_at_ms,
792                                &query.with_metadata,
793                            );
794                            let (columns, values) = pairwise_columns_values(&vector_values);
795                            let dense = find_column_value_vec_f32_any(
796                                &columns,
797                                &values,
798                                &["dense", "embedding"],
799                            )?;
800                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
801                            let content =
802                                find_column_value_opt_string(&columns, &values, "content");
803                            if query.returning.is_some() {
804                                returning_field_snaps.push(vector_values.clone());
805                            }
806                            let input = CreateVectorInput {
807                                collection: query.table.clone(),
808                                dense,
809                                content,
810                                metadata,
811                                link_row: None,
812                                link_node: None,
813                            };
814                            entity_outputs.push(self.create_vector(input)?);
815                        }
816                        InsertEntityType::Document => {
817                            let (document_values, mut metadata) =
818                                split_insert_metadata(self, &query.columns, row_values)?;
819                            merge_with_clauses(
820                                &mut metadata,
821                                query.ttl_ms,
822                                query.expires_at_ms,
823                                &query.with_metadata,
824                            );
825                            let (columns, values) = pairwise_columns_values(&document_values);
826                            let body_str = find_column_value_string(&columns, &values, "body")?;
827                            let body: crate::json::Value = crate::json::from_str(&body_str)
828                                .map_err(|e| {
829                                    RedDBError::Query(format!("invalid JSON body: {e}"))
830                                })?;
831                            let input = CreateDocumentInput {
832                                collection: query.table.clone(),
833                                body,
834                                metadata,
835                                node_links: Vec::new(),
836                                vector_links: Vec::new(),
837                            };
838                            let output = self.create_document(input)?;
839                            if query.returning.is_some() {
840                                let fields = output
841                                    .entity
842                                    .as_ref()
843                                    .map(entity_row_fields_snapshot)
844                                    .filter(|fields| !fields.is_empty())
845                                    .unwrap_or(document_values);
846                                returning_field_snaps.push(fields);
847                            }
848                            entity_outputs.push(output);
849                        }
850                        InsertEntityType::Kv => {
851                            let (kv_values, mut metadata) =
852                                split_insert_metadata(self, &query.columns, row_values)?;
853                            merge_with_clauses(
854                                &mut metadata,
855                                query.ttl_ms,
856                                query.expires_at_ms,
857                                &query.with_metadata,
858                            );
859                            let (columns, values) = pairwise_columns_values(&kv_values);
860                            let key = find_column_value_string(&columns, &values, "key")?;
861                            let value = find_column_value(&columns, &values, "value")?;
862                            if query.returning.is_some() {
863                                returning_field_snaps.push(kv_values.clone());
864                            }
865                            let input = CreateKvInput {
866                                collection: query.table.clone(),
867                                key,
868                                value,
869                                metadata,
870                            };
871                            entity_outputs.push(self.create_kv(input)?);
872                        }
873                    }
874
875                    inserted_count += 1;
876                }
877            }
878
879            if let Some(items) = query.returning.as_ref() {
880                if !entity_outputs.is_empty() {
881                    returning_result = Some(build_returning_result(
882                        items,
883                        &returning_field_snaps,
884                        Some(&entity_outputs),
885                    ));
886                }
887            }
888        }
889
890        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
891        if let Some(ref embed_config) = query.auto_embed {
892            let store = self.inner.db.store();
893            let provider = crate::ai::parse_provider(&embed_config.provider)?;
894            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
895            let model = embed_config.model.clone().unwrap_or_else(|| {
896                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
897                    .ok()
898                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
899            });
900
901            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
902            let manager = store
903                .get_collection(&query.table)
904                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
905            let entities = manager.query_all(|_| true);
906            let recent: Vec<_> = entities
907                .into_iter()
908                .rev()
909                .take(effective_rows.len())
910                .collect();
911
912            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
913            let entity_combos: Vec<(usize, String)> = recent
914                .iter()
915                .enumerate()
916                .filter_map(|(i, entity)| {
917                    if let EntityData::Row(ref row) = entity.data {
918                        if let Some(ref named) = row.named {
919                            let texts: Vec<String> = embed_config
920                                .fields
921                                .iter()
922                                .filter_map(|field| match named.get(field) {
923                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
924                                    _ => None,
925                                })
926                                .collect();
927                            if !texts.is_empty() {
928                                return Some((i, texts.join(" ")));
929                            }
930                        }
931                    }
932                    None
933                })
934                .collect();
935
936            if !entity_combos.is_empty() {
937                // Batch phase: single provider round-trip for all rows.
938                let batch_texts: Vec<String> =
939                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
940
941                let batch_client =
942                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
943
944                let embeddings = match tokio::runtime::Handle::try_current() {
945                    Ok(handle) => tokio::task::block_in_place(|| {
946                        handle.block_on(batch_client.embed_batch(
947                            &provider,
948                            &model,
949                            &api_key,
950                            batch_texts,
951                        ))
952                    }),
953                    Err(_) => {
954                        return Err(RedDBError::Query(
955                            "AUTO EMBED requires a Tokio runtime context".to_string(),
956                        ));
957                    }
958                }
959                .map_err(|e| RedDBError::Query(e.to_string()))?;
960
961                // Distribute phase: persist one vector per non-empty embedding.
962                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
963                    if dense.is_empty() {
964                        continue;
965                    }
966                    self.create_vector(CreateVectorInput {
967                        collection: query.table.clone(),
968                        dense,
969                        content: Some(combined.clone()),
970                        metadata: Vec::new(),
971                        link_row: None,
972                        link_node: None,
973                    })?;
974                }
975            }
976        }
977
978        if inserted_count > 0 {
979            self.note_table_write(&query.table);
980        }
981
982        let mut result = RuntimeQueryResult::dml_result(
983            raw_query.to_string(),
984            inserted_count,
985            "insert",
986            "runtime-dml",
987        );
988        if let Some(returning) = returning_result {
989            result.result = returning;
990        }
991        Ok(result)
992    }
993
994    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
995        let Some(auth_store) = self.inner.auth_store.read().clone() else {
996            return Ok(());
997        };
998        if !auth_store.iam_authorization_enabled() {
999            return Ok(());
1000        }
1001        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
1002            return Ok(());
1003        };
1004
1005        let tenant = crate::runtime::impl_core::current_tenant();
1006        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
1007        let request = crate::auth::ColumnAccessRequest {
1008            action: "insert".to_string(),
1009            schema: None,
1010            table: query.table.clone(),
1011            columns: query.columns.clone(),
1012        };
1013        let ctx = crate::auth::policies::EvalContext {
1014            principal_tenant: tenant.clone(),
1015            current_tenant: tenant,
1016            peer_ip: None,
1017            mfa_present: false,
1018            now_ms: crate::auth::now_ms(),
1019            principal_is_admin_role: role == crate::auth::Role::Admin,
1020        };
1021
1022        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
1023        let table_allowed = matches!(
1024            outcome.table_decision,
1025            crate::auth::policies::Decision::Allow { .. }
1026                | crate::auth::policies::Decision::AdminBypass
1027        );
1028        if !table_allowed {
1029            return Err(RedDBError::Query(format!(
1030                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1031                outcome.table_resource.kind, outcome.table_resource.name
1032            )));
1033        }
1034        if let Some(denied) = outcome.first_denied_column() {
1035            return Err(RedDBError::Query(format!(
1036                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1037                denied.resource.kind, denied.resource.name
1038            )));
1039        }
1040
1041        Ok(())
1042    }
1043
1044    pub(crate) fn insert_timeseries_point(
1045        &self,
1046        collection: &str,
1047        fields: Vec<(String, Value)>,
1048        mut metadata: Vec<(String, MetadataValue)>,
1049    ) -> RedDBResult<EntityId> {
1050        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1051
1052        let (columns, values) = pairwise_columns_values(&fields);
1053        validate_timeseries_insert_columns(&columns)?;
1054
1055        let metric = find_column_value_string(&columns, &values, "metric")?;
1056        let value = find_column_value_f64(&columns, &values, "value")?;
1057        let timestamp_ns =
1058            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1059        let tags = find_timeseries_tags(&columns, &values)?;
1060
1061        let mut entity = UnifiedEntity::new(
1062            EntityId::new(0),
1063            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1064                series: collection.to_string(),
1065                metric: metric.clone(),
1066            })),
1067            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1068                metric,
1069                timestamp_ns,
1070                value,
1071                tags,
1072            }),
1073        );
1074        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1075        // or an autocommit xid (allocated and committed up-front so
1076        // future snapshots see the row as soon as it lands).
1077        let writer_xid = match self.current_xid() {
1078            Some(xid) => xid,
1079            None => {
1080                let mgr = self.snapshot_manager();
1081                let xid = mgr.begin();
1082                mgr.commit(xid);
1083                xid
1084            }
1085        };
1086        entity.set_xmin(writer_xid);
1087
1088        let store = self.inner.db.store();
1089        let id = store
1090            .insert_auto(collection, entity)
1091            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1092
1093        if !metadata.is_empty() {
1094            let _ = store.set_metadata(
1095                collection,
1096                id,
1097                Metadata::with_fields(metadata.into_iter().collect()),
1098            );
1099        }
1100
1101        self.cdc_emit(
1102            crate::replication::cdc::ChangeOperation::Insert,
1103            collection,
1104            id.raw(),
1105            "timeseries",
1106        );
1107
1108        Ok(id)
1109    }
1110
1111    /// Execute UPDATE table SET col=val, ... WHERE filter
1112    ///
1113    /// Scans the target collection, evaluates the WHERE filter against each
1114    /// record, and patches every matching entity.
1115    pub fn execute_update(
1116        &self,
1117        raw_query: &str,
1118        query: &UpdateQuery,
1119    ) -> RedDBResult<RuntimeQueryResult> {
1120        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1121        // CollectionContract gate (#50): runs the APPEND ONLY guard
1122        // (and any future contract bits) before RLS / RETURNING work
1123        // so the operator's immutability declaration is honoured
1124        // uniformly and the error message points at the DDL rather
1125        // than at a downstream symptom.
1126        crate::runtime::collection_contract::CollectionContractGate::check(
1127            self,
1128            &query.table,
1129            crate::runtime::collection_contract::MutationKind::Update,
1130        )?;
1131        ensure_update_target_contract(self, &query.table, query.target)?;
1132
1133        // Apply RLS augmentation first so every downstream path — plain
1134        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1135        // same policy-filtered target set. This prevents RETURNING
1136        // from ever exposing rows the UPDATE policy would have
1137        // denied.
1138        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1139        let augmented_query: UpdateQuery;
1140        let effective_query: &UpdateQuery = if rls_gated {
1141            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1142                self,
1143                &query.table,
1144                crate::storage::query::ast::PolicyAction::Update,
1145            );
1146            let Some(policy) = rls_filter else {
1147                // No admitting policy: zero rows affected, empty
1148                // RETURNING (never leak rows the caller can't touch).
1149                let mut response = RuntimeQueryResult::dml_result(
1150                    raw_query.to_string(),
1151                    0,
1152                    "update",
1153                    "runtime-dml-rls",
1154                );
1155                if let Some(items) = query.returning.clone() {
1156                    response.result = build_returning_result(&items, &[], None);
1157                }
1158                return Ok(response);
1159            };
1160            let mut augmented = query.clone();
1161            augmented.filter = Some(match augmented.filter.take() {
1162                Some(existing) => {
1163                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1164                }
1165                None => policy,
1166            });
1167            augmented_query = augmented;
1168            &augmented_query
1169        } else {
1170            query
1171        };
1172
1173        // RETURNING wraps the inner executor and uses the touched-id
1174        // list the inner reports so the post-image reflects exactly
1175        // the rows the UPDATE actually mutated (not whatever a
1176        // separate SELECT might have observed).
1177        if let Some(items) = effective_query.returning.clone() {
1178            let mut inner_query = effective_query.clone();
1179            inner_query.returning = None;
1180            let (mut response, touched_ids) =
1181                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1182
1183            let snapshots = if matches!(
1184                effective_query.target,
1185                UpdateTarget::Nodes | UpdateTarget::Edges
1186            ) {
1187                graph_update_returning_snapshots(self, &effective_query.table, &touched_ids)
1188            } else {
1189                super::dml_target_scan::DmlTargetScan::new(self, &effective_query.table, None, None)
1190                    .row_snapshots(&touched_ids)
1191            };
1192
1193            response.result = build_returning_result(&items, &snapshots, None);
1194            response.engine = "runtime-dml-returning";
1195            return Ok(response);
1196        }
1197
1198        self.execute_update_inner(raw_query, effective_query)
1199    }
1200
1201    /// Back-compat shim: the older entry point ignored touched ids.
1202    fn execute_update_inner(
1203        &self,
1204        raw_query: &str,
1205        query: &UpdateQuery,
1206    ) -> RedDBResult<RuntimeQueryResult> {
1207        self.execute_update_inner_tracked(raw_query, query)
1208            .map(|(res, _)| res)
1209    }
1210
1211    fn execute_update_inner_tracked(
1212        &self,
1213        raw_query: &str,
1214        query: &UpdateQuery,
1215    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1216        let store = self.inner.db.store();
1217        let effective_filter = effective_update_filter(query);
1218        let compiled_plan = self.compile_update_plan(query)?;
1219        let mut touched_ids: Vec<EntityId> = Vec::new();
1220        let limit_cap = query.limit.map(|l| l as usize);
1221        let manager = store
1222            .get_collection(&query.table)
1223            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1224        let scan_limit = if query.order_by.is_empty() {
1225            limit_cap
1226        } else {
1227            None
1228        };
1229        let ids_to_update = super::dml_target_scan::DmlTargetScan::with_update_target(
1230            self,
1231            &query.table,
1232            effective_filter.as_ref(),
1233            scan_limit,
1234            query.target,
1235        )
1236        .find_target_ids()?;
1237        let ids_to_update = if query.order_by.is_empty() {
1238            ids_to_update
1239        } else {
1240            ordered_update_target_ids(&manager, &ids_to_update, &query.order_by, limit_cap)
1241        };
1242
1243        if update_needs_rmw_lock(query) {
1244            return self.execute_update_inner_tracked_locked(
1245                raw_query,
1246                query,
1247                &compiled_plan,
1248                &ids_to_update,
1249                effective_filter.as_ref(),
1250            );
1251        }
1252
1253        let mut affected: u64 = 0;
1254        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1255            let mut applied_chunk = Vec::with_capacity(chunk.len());
1256            for entity in manager.get_many(chunk).into_iter().flatten() {
1257                let assignments =
1258                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1259                let applied = self.apply_materialized_update_for_entity(
1260                    query.table.clone(),
1261                    entity,
1262                    &compiled_plan,
1263                    assignments,
1264                )?;
1265                touched_ids.push(applied.id);
1266                applied_chunk.push(applied);
1267            }
1268            self.persist_update_chunk(&applied_chunk)?;
1269            affected += applied_chunk.len() as u64;
1270            let lsns = self.flush_update_chunk(&applied_chunk)?;
1271            if !query.suppress_events {
1272                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1273            }
1274        }
1275
1276        if affected > 0 {
1277            self.note_table_write(&query.table);
1278        }
1279
1280        Ok((
1281            RuntimeQueryResult::dml_result(
1282                raw_query.to_string(),
1283                affected,
1284                "update",
1285                "runtime-dml",
1286            ),
1287            touched_ids,
1288        ))
1289    }
1290
1291    fn execute_update_inner_tracked_locked(
1292        &self,
1293        raw_query: &str,
1294        query: &UpdateQuery,
1295        compiled_plan: &CompiledUpdatePlan,
1296        ids_to_update: &[EntityId],
1297        effective_filter: Option<&Filter>,
1298    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1299        let store = self.inner.db.store();
1300        let mut touched_ids = Vec::new();
1301        let mut lock_entries = Vec::new();
1302
1303        for id in ids_to_update {
1304            let Some(candidate) = store.get(&query.table, *id) else {
1305                continue;
1306            };
1307            let logical_id = candidate.logical_id();
1308            let lock_key = format!("row:{}", logical_id.raw());
1309            let rmw_lock = self.inner.rmw_locks.lock_for(&query.table, &lock_key);
1310            lock_entries.push((lock_key, logical_id, rmw_lock));
1311        }
1312
1313        lock_entries.sort_by(|left, right| left.0.cmp(&right.0));
1314        lock_entries.dedup_by(|left, right| left.0 == right.0);
1315        let _rmw_guards: Vec<_> = lock_entries.iter().map(|entry| entry.2.lock()).collect();
1316
1317        let mut applied_chunk = Vec::new();
1318        for (_, logical_id, _) in &lock_entries {
1319            let Some(entity) = resolve_update_entity_by_logical_id(self, &query.table, *logical_id)
1320            else {
1321                continue;
1322            };
1323            if let Some(filter) = effective_filter {
1324                if !crate::runtime::query_exec::evaluate_entity_filter_with_db(
1325                    Some(self.inner.db.as_ref()),
1326                    &entity,
1327                    filter,
1328                    &query.table,
1329                    &query.table,
1330                ) {
1331                    continue;
1332                }
1333            }
1334
1335            let assignments =
1336                self.materialize_update_assignments_for_entity(query, &entity, compiled_plan)?;
1337            let applied = self.apply_materialized_update_for_entity(
1338                query.table.clone(),
1339                entity,
1340                compiled_plan,
1341                assignments,
1342            )?;
1343            touched_ids.push(applied.id);
1344            applied_chunk.push(applied);
1345        }
1346
1347        let affected = applied_chunk.len() as u64;
1348        if !applied_chunk.is_empty() {
1349            self.persist_update_chunk(&applied_chunk)?;
1350            let lsns = self.flush_update_chunk(&applied_chunk)?;
1351            if !query.suppress_events {
1352                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1353            }
1354        }
1355
1356        if affected > 0 {
1357            self.note_table_write(&query.table);
1358        }
1359
1360        Ok((
1361            RuntimeQueryResult::dml_result(
1362                raw_query.to_string(),
1363                affected,
1364                "update",
1365                "runtime-dml",
1366            ),
1367            touched_ids,
1368        ))
1369    }
1370
1371    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1372        let mut static_field_assignments = Vec::new();
1373        let mut static_metadata_assignments = Vec::new();
1374        let mut dynamic_assignments = Vec::new();
1375        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1376        let mut row_modified_columns = Vec::new();
1377
1378        for (idx, (column, expr)) in query.assignment_exprs.iter().enumerate() {
1379            let compound_op = query.compound_assignment_ops.get(idx).copied().flatten();
1380            let metadata_key = resolve_sql_ttl_metadata_key(column);
1381            if compound_op.is_some() && metadata_key.is_some() {
1382                return Err(RedDBError::Query(format!(
1383                    "compound assignment is only supported for row fields: {column}"
1384                )));
1385            }
1386            if compound_op.is_none() {
1387                if let Ok(value) = fold_expr_to_value(expr.clone()) {
1388                    if let Some(metadata_key) = metadata_key {
1389                        let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1390                        let (canonical_key, canonical_value) =
1391                            canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1392                        static_metadata_assignments
1393                            .push((canonical_key.to_string(), canonical_value));
1394                    } else {
1395                        let value = self.resolve_crypto_sentinel(value)?;
1396                        static_field_assignments.push((
1397                            column.clone(),
1398                            normalize_row_update_assignment_with_plan(
1399                                &query.table,
1400                                column,
1401                                value,
1402                                row_contract_plan.as_ref(),
1403                            )?,
1404                        ));
1405                        row_modified_columns.push(column.clone());
1406                    }
1407                    continue;
1408                }
1409            }
1410
1411            dynamic_assignments.push(CompiledUpdateAssignment {
1412                column: column.clone(),
1413                expr: expr.clone(),
1414                compound_op,
1415                metadata_key,
1416                row_rule: if metadata_key.is_none() {
1417                    if let Some(plan) = row_contract_plan.as_ref() {
1418                        if plan.timestamps_enabled
1419                            && (column == "created_at" || column == "updated_at")
1420                        {
1421                            return Err(RedDBError::Query(format!(
1422                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1423                                query.table, column
1424                            )));
1425                        }
1426                        if let Some(rule) = plan.declared_rules.get(column) {
1427                            Some(rule.clone())
1428                        } else if plan.strict_schema {
1429                            return Err(RedDBError::Query(format!(
1430                                "collection '{}' is strict and does not allow undeclared fields: {}",
1431                                query.table, column
1432                            )));
1433                        } else {
1434                            None
1435                        }
1436                    } else {
1437                        None
1438                    }
1439                } else {
1440                    None
1441                },
1442            });
1443            if metadata_key.is_none() {
1444                row_modified_columns.push(column.clone());
1445            }
1446        }
1447
1448        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1449        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1450            row_modified_columns.iter().any(|column| {
1451                plan.unique_columns
1452                    .keys()
1453                    .any(|unique| unique.eq_ignore_ascii_case(column))
1454            })
1455        });
1456
1457        if let Some(ttl_ms) = query.ttl_ms {
1458            static_metadata_assignments
1459                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1460        }
1461        if let Some(expires_at_ms) = query.expires_at_ms {
1462            static_metadata_assignments.push((
1463                "_expires_at".to_string(),
1464                metadata_u64_to_value(expires_at_ms),
1465            ));
1466        }
1467        for (key, val) in &query.with_metadata {
1468            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1469        }
1470
1471        Ok(CompiledUpdatePlan {
1472            static_field_assignments,
1473            static_metadata_assignments,
1474            dynamic_assignments,
1475            row_contract_plan,
1476            row_modified_columns,
1477            row_touches_unique_columns,
1478        })
1479    }
1480
1481    fn materialize_update_assignments_for_entity(
1482        &self,
1483        query: &UpdateQuery,
1484        entity: &UnifiedEntity,
1485        compiled_plan: &CompiledUpdatePlan,
1486    ) -> RedDBResult<MaterializedUpdateAssignments> {
1487        let mut assignments = MaterializedUpdateAssignments::default();
1488        let mut record: Option<UnifiedRecord> = None;
1489
1490        for assignment in &compiled_plan.dynamic_assignments {
1491            if assignment.compound_op.is_some()
1492                && !matches!(
1493                    entity.data,
1494                    EntityData::Row(_) | EntityData::Node(_) | EntityData::Edge(_)
1495                )
1496            {
1497                return Err(RedDBError::Query(format!(
1498                    "compound assignment is only supported for row or graph UPDATE column '{}'",
1499                    assignment.column
1500                )));
1501            }
1502            if record.is_none() {
1503                record = runtime_any_record_from_entity_ref(entity);
1504            }
1505            let Some(record) = record.as_ref() else {
1506                return Err(RedDBError::Query(format!(
1507                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1508                    entity.id.raw(),
1509                    query.table
1510                )));
1511            };
1512            let rhs = super::expr_eval::evaluate_runtime_expr_with_db(
1513                Some(self.inner.db.as_ref()),
1514                &assignment.expr,
1515                record,
1516                Some(query.table.as_str()),
1517                Some(query.table.as_str()),
1518            )
1519            .ok_or_else(|| {
1520                RedDBError::Query(format!(
1521                    "failed to evaluate UPDATE expression for column '{}'",
1522                    assignment.column
1523                ))
1524            })?;
1525            let value = if let Some(op) = assignment.compound_op {
1526                evaluate_compound_update_assignment(&assignment.column, record, op, rhs)?
1527            } else {
1528                rhs
1529            };
1530
1531            if let Some(metadata_key) = assignment.metadata_key {
1532                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1533                let (canonical_key, canonical_value) =
1534                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1535                assignments
1536                    .dynamic_metadata_assignments
1537                    .push((canonical_key.to_string(), canonical_value));
1538            } else {
1539                assignments.dynamic_field_assignments.push((
1540                    assignment.column.clone(),
1541                    normalize_row_update_value_for_rule(
1542                        &query.table,
1543                        self.resolve_crypto_sentinel(value)?,
1544                        assignment.row_rule.as_ref(),
1545                    )?,
1546                ));
1547            }
1548        }
1549
1550        Ok(assignments)
1551    }
1552
1553    fn apply_materialized_update_for_entity(
1554        &self,
1555        collection: String,
1556        entity: UnifiedEntity,
1557        compiled_plan: &CompiledUpdatePlan,
1558        assignments: MaterializedUpdateAssignments,
1559    ) -> RedDBResult<AppliedEntityMutation> {
1560        if matches!(entity.data, EntityData::Row(_)) {
1561            return self.apply_loaded_sql_update_row_core(
1562                collection,
1563                entity,
1564                &compiled_plan.static_field_assignments,
1565                assignments.dynamic_field_assignments,
1566                &compiled_plan.static_metadata_assignments,
1567                assignments.dynamic_metadata_assignments,
1568                compiled_plan.row_contract_plan.as_ref(),
1569                &compiled_plan.row_modified_columns,
1570                compiled_plan.row_touches_unique_columns,
1571            );
1572        }
1573
1574        ensure_graph_identity_update_allowed(&entity, compiled_plan, &assignments)?;
1575
1576        let operations = build_patch_operations_from_materialized_assignments(
1577            &entity,
1578            compiled_plan,
1579            assignments,
1580        );
1581        self.apply_loaded_patch_entity_core(
1582            collection,
1583            entity,
1584            crate::json::Value::Null,
1585            operations,
1586        )
1587    }
1588
1589    /// Execute DELETE FROM table WHERE filter
1590    pub fn execute_delete(
1591        &self,
1592        raw_query: &str,
1593        query: &DeleteQuery,
1594    ) -> RedDBResult<RuntimeQueryResult> {
1595        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1596        // CollectionContract gate (#50) — see execute_update for
1597        // rationale. The gate handles APPEND ONLY rejection and is
1598        // the single point where future contract bits land.
1599        crate::runtime::collection_contract::CollectionContractGate::check(
1600            self,
1601            &query.table,
1602            crate::runtime::collection_contract::MutationKind::Delete,
1603        )?;
1604
1605        // RETURNING on DELETE: capture the pre-image via an internal
1606        // SELECT that reuses the same WHERE, then run the delete with
1607        // the RETURNING clause stripped, then project the captured
1608        // rows through the requested items. The extra SELECT is a
1609        // pragmatic MVP — a future pass can fuse the scan with the
1610        // delete to avoid the second pass over the heap.
1611        if let Some(items) = query.returning.clone() {
1612            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1613                RedDBError::Query(
1614                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1615                )
1616            })?;
1617            let captured = self.execute_query(&select_sql)?;
1618
1619            let mut inner_query = query.clone();
1620            inner_query.returning = None;
1621            let _ = self.execute_delete(raw_query, &inner_query)?;
1622
1623            let snapshots: Vec<Vec<(String, Value)>> = captured
1624                .result
1625                .records
1626                .iter()
1627                .map(|rec| {
1628                    rec.iter_fields()
1629                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1630                        .collect()
1631                })
1632                .collect();
1633            let affected = snapshots.len() as u64;
1634            let result = build_returning_result(&items, &snapshots, None);
1635
1636            let mut response = RuntimeQueryResult::dml_result(
1637                raw_query.to_string(),
1638                affected,
1639                "delete",
1640                "runtime-dml-returning",
1641            );
1642            response.result = result;
1643            return Ok(response);
1644        }
1645        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
1646        //
1647        // When the table has RLS enabled, gate the DELETE by the
1648        // per-role policy set: mutations only touch rows that *every*
1649        // matching `FOR DELETE` policy would accept. No policies =>
1650        // zero rows affected (PG restrictive-default).
1651        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1652            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1653                self,
1654                &query.table,
1655                crate::storage::query::ast::PolicyAction::Delete,
1656            );
1657            let Some(policy) = rls_filter else {
1658                return Ok(RuntimeQueryResult::dml_result(
1659                    raw_query.to_string(),
1660                    0,
1661                    "delete",
1662                    "runtime-dml-rls",
1663                ));
1664            };
1665            // Fold the policy predicate into the user's WHERE before
1666            // dispatching — the remainder of this function reads the
1667            // filter from `query` via `effective_delete_filter`, which
1668            // respects the updated value.
1669            let mut augmented = query.clone();
1670            augmented.filter = Some(match augmented.filter.take() {
1671                Some(existing) => {
1672                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1673                }
1674                None => policy,
1675            });
1676            return self.execute_delete_inner(raw_query, &augmented);
1677        }
1678        self.execute_delete_inner(raw_query, query)
1679    }
1680
1681    fn execute_delete_inner(
1682        &self,
1683        raw_query: &str,
1684        query: &DeleteQuery,
1685    ) -> RedDBResult<RuntimeQueryResult> {
1686        let effective_filter = effective_delete_filter(query);
1687
1688        // Find the rows that match the WHERE clause. The "find target
1689        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
1690        // the same scan strategy.
1691        let scan = super::dml_target_scan::DmlTargetScan::new(
1692            self,
1693            &query.table,
1694            effective_filter.as_ref(),
1695            None,
1696        );
1697        let ids_to_delete = scan.find_target_ids()?;
1698
1699        // For event-enabled collections, snapshot the pre-delete state
1700        // before rows are physically removed.
1701        let needs_delete_events =
1702            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1703        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1704            scan.row_json_pre_images(&ids_to_delete)
1705        } else {
1706            HashMap::new()
1707        };
1708
1709        let mut affected: u64 = 0;
1710        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1711            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1712            affected += count;
1713            if needs_delete_events && !lsns.is_empty() {
1714                // lsns.len() == actually-deleted entities; align with chunk ids.
1715                // `delete_batch` may skip missing entities, so we correlate by
1716                // the number returned (they're emitted in chunk order).
1717                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1718                self.emit_delete_events_for_collection(
1719                    &query.table,
1720                    deleted_chunk,
1721                    &lsns,
1722                    &pre_images,
1723                )?;
1724            }
1725        }
1726        pre_images.clear();
1727
1728        if affected > 0 {
1729            self.note_table_write(&query.table);
1730        }
1731
1732        Ok(RuntimeQueryResult::dml_result(
1733            raw_query.to_string(),
1734            affected,
1735            "delete",
1736            "runtime-dml",
1737        ))
1738    }
1739}
1740
1741fn ensure_graph_identity_update_allowed(
1742    entity: &UnifiedEntity,
1743    compiled_plan: &CompiledUpdatePlan,
1744    assignments: &MaterializedUpdateAssignments,
1745) -> RedDBResult<()> {
1746    if !matches!(entity.data, EntityData::Node(_) | EntityData::Edge(_)) {
1747        return Ok(());
1748    }
1749
1750    for (column, _) in compiled_plan
1751        .static_field_assignments
1752        .iter()
1753        .chain(assignments.dynamic_field_assignments.iter())
1754    {
1755        if is_immutable_graph_identity_field(column) {
1756            return Err(RedDBError::Query(format!(
1757                "immutable graph field '{column}' cannot be updated"
1758            )));
1759        }
1760    }
1761
1762    Ok(())
1763}
1764
1765fn is_immutable_graph_identity_field(column: &str) -> bool {
1766    ["rid", "label", "from_rid", "to_rid", "from", "to"]
1767        .iter()
1768        .any(|reserved| column.eq_ignore_ascii_case(reserved))
1769}
1770
1771fn build_patch_operations_from_materialized_assignments(
1772    entity: &UnifiedEntity,
1773    compiled_plan: &CompiledUpdatePlan,
1774    assignments: MaterializedUpdateAssignments,
1775) -> Vec<PatchEntityOperation> {
1776    let mut operations = Vec::with_capacity(
1777        compiled_plan.static_field_assignments.len()
1778            + compiled_plan.static_metadata_assignments.len()
1779            + assignments.dynamic_field_assignments.len()
1780            + assignments.dynamic_metadata_assignments.len(),
1781    );
1782
1783    for (column, value) in &compiled_plan.static_field_assignments {
1784        operations.push(PatchEntityOperation {
1785            op: PatchEntityOperationType::Set,
1786            path: update_patch_path_for_entity(entity, column),
1787            value: Some(storage_value_to_json(value)),
1788        });
1789    }
1790
1791    for (column, value) in assignments.dynamic_field_assignments {
1792        operations.push(PatchEntityOperation {
1793            op: PatchEntityOperationType::Set,
1794            path: update_patch_path_for_entity(entity, &column),
1795            value: Some(storage_value_to_json(&value)),
1796        });
1797    }
1798
1799    for (key, value) in &compiled_plan.static_metadata_assignments {
1800        operations.push(PatchEntityOperation {
1801            op: PatchEntityOperationType::Set,
1802            path: vec!["metadata".to_string(), key.clone()],
1803            value: Some(metadata_value_to_json(value)),
1804        });
1805    }
1806
1807    for (key, value) in assignments.dynamic_metadata_assignments {
1808        operations.push(PatchEntityOperation {
1809            op: PatchEntityOperationType::Set,
1810            path: vec!["metadata".to_string(), key],
1811            value: Some(metadata_value_to_json(&value)),
1812        });
1813    }
1814
1815    operations
1816}
1817
1818fn update_patch_path_for_entity(entity: &UnifiedEntity, column: &str) -> Vec<String> {
1819    if matches!(
1820        (&entity.kind, &entity.data),
1821        (
1822            crate::storage::EntityKind::GraphNode(_),
1823            EntityData::Node(_)
1824        )
1825    ) && column.eq_ignore_ascii_case("node_type")
1826    {
1827        return vec!["node_type".to_string()];
1828    }
1829    if matches!(
1830        (&entity.kind, &entity.data),
1831        (
1832            crate::storage::EntityKind::GraphEdge(_),
1833            EntityData::Edge(_)
1834        )
1835    ) && column.eq_ignore_ascii_case("weight")
1836    {
1837        return vec!["weight".to_string()];
1838    }
1839    vec!["fields".to_string(), column.to_string()]
1840}
1841
1842/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
1843/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
1844/// capture the pre-image before actually removing the rows. Returns
1845/// `None` when the input does not start with `DELETE`.
1846///
1847/// Case-insensitive on the keywords. Preserves everything between
1848/// the table name and the RETURNING clause, so WHERE / ORDER BY /
1849/// LIMIT survive untouched. The RETURNING tail — if present — is
1850/// truncated at the first top-level `RETURNING` token.
1851fn delete_to_select_sql(sql: &str) -> Option<String> {
1852    let trimmed = sql.trim_start();
1853    let lowered = trimmed.to_ascii_lowercase();
1854    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1855        return None;
1856    }
1857    // Find `FROM` after DELETE.
1858    let from_idx = lowered.find(" from ")?;
1859    let after_from = &trimmed[from_idx + " from ".len()..];
1860    let after_from_lc = &lowered[from_idx + " from ".len()..];
1861
1862    // Cut off the RETURNING tail (a naive search — the RETURNING
1863    // clause only appears once per statement at top level in our
1864    // grammar). Matches whitespace-bounded tokens to avoid clipping
1865    // `RETURNING` inside a string literal.
1866    let mut body = after_from.to_string();
1867    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1868        body.truncate(pos);
1869    }
1870    Some(format!("SELECT * FROM {}", body.trim_end()))
1871}
1872
1873/// Find the byte offset of a whitespace-bounded keyword in a
1874/// lowercased haystack, skipping matches inside single-quoted
1875/// string literals. Naive — no escape handling — but enough for
1876/// the shapes the DML parser emits.
1877fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1878    let bytes = haystack.as_bytes();
1879    let nlen = needle.len();
1880    let mut i = 0usize;
1881    let mut in_string = false;
1882    while i < bytes.len() {
1883        let c = bytes[i];
1884        if c == b'\'' {
1885            in_string = !in_string;
1886            i += 1;
1887            continue;
1888        }
1889        if !in_string
1890            && i + nlen <= bytes.len()
1891            && &bytes[i..i + nlen] == needle.as_bytes()
1892            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1893            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1894        {
1895            return Some(i);
1896        }
1897        i += 1;
1898    }
1899    None
1900}
1901
1902/// Build a `UnifiedResult` from the rows affected by a DML statement plus
1903/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
1904/// for one affected row; `outputs`, when provided, supplies the engine-
1905/// assigned entity id for the same row (INSERT path). Projection honours
1906/// the RETURNING items: `*` expands to every snapshot column plus
1907/// the public row envelope when available.
1908fn build_returning_result(
1909    items: &[ReturningItem],
1910    snapshots: &[Vec<(String, Value)>],
1911    outputs: Option<&[CreateEntityOutput]>,
1912) -> UnifiedResult {
1913    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1914    let public_item_outputs = outputs.is_some_and(|outs| {
1915        outs.first()
1916            .and_then(|out| out.entity.as_ref())
1917            .is_some_and(|entity| public_returning_item_kind(entity).is_some())
1918    });
1919
1920    let mut columns: Vec<String> = if project_all {
1921        let mut cols: Vec<String> = Vec::new();
1922        if public_item_outputs {
1923            cols.extend(
1924                [
1925                    "rid",
1926                    "collection",
1927                    "kind",
1928                    "tenant",
1929                    "created_at",
1930                    "updated_at",
1931                ]
1932                .into_iter()
1933                .map(str::to_string),
1934            );
1935        } else if outputs.is_some() {
1936            cols.push("red_entity_id".to_string());
1937        }
1938        if let Some(first) = snapshots.first() {
1939            for (name, _) in first {
1940                cols.push(name.clone());
1941            }
1942        }
1943        cols
1944    } else {
1945        items
1946            .iter()
1947            .filter_map(|it| match it {
1948                ReturningItem::Column(c) => Some(c.clone()),
1949                ReturningItem::All => None,
1950            })
1951            .collect()
1952    };
1953    // Guarantee unique order-preserving column list.
1954    {
1955        let mut seen = std::collections::HashSet::new();
1956        columns.retain(|c| seen.insert(c.clone()));
1957    }
1958
1959    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1960    for (idx, snap) in snapshots.iter().enumerate() {
1961        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1962        if let Some(outs) = outputs {
1963            if let Some(out) = outs.get(idx) {
1964                if let Some(entity) = out.entity.as_ref() {
1965                    if let Some(kind) = public_returning_item_kind(entity) {
1966                        values.insert(
1967                            Arc::clone(&sys_key_rid()),
1968                            Value::UnsignedInteger(out.id.raw()),
1969                        );
1970                        values.insert(
1971                            Arc::clone(&sys_key_collection()),
1972                            Value::text(entity.kind.collection().to_string()),
1973                        );
1974                        values.insert(Arc::clone(&sys_key_kind()), Value::text(kind.to_string()));
1975                        values.insert(
1976                            Arc::clone(&sys_key_created_at()),
1977                            Value::UnsignedInteger(entity.created_at),
1978                        );
1979                        values.insert(
1980                            Arc::clone(&sys_key_updated_at()),
1981                            Value::UnsignedInteger(entity.updated_at),
1982                        );
1983                    } else {
1984                        values.insert(
1985                            Arc::clone(&sys_key_red_entity_id()),
1986                            Value::Integer(out.id.raw() as i64),
1987                        );
1988                    }
1989                } else {
1990                    values.insert(
1991                        Arc::clone(&sys_key_red_entity_id()),
1992                        Value::Integer(out.id.raw() as i64),
1993                    );
1994                }
1995            }
1996        }
1997        for (name, val) in snap {
1998            values.insert(Arc::from(name.as_str()), val.clone());
1999        }
2000        if !values.contains_key("tenant") {
2001            let tenant = values.get("tenant_id").cloned().unwrap_or(Value::Null);
2002            values.insert(Arc::clone(&sys_key_tenant()), tenant);
2003        }
2004        let mut rec = UnifiedRecord::default();
2005        // Only keep projected columns on the record.
2006        for col in &columns {
2007            if let Some(v) = values.get(col.as_str()) {
2008                rec.set_arc(Arc::from(col.as_str()), v.clone());
2009            }
2010        }
2011        records.push(rec);
2012    }
2013
2014    UnifiedResult {
2015        columns,
2016        records,
2017        stats: Default::default(),
2018        pre_serialized_json: None,
2019    }
2020}
2021
2022fn public_returning_item_kind(entity: &crate::storage::UnifiedEntity) -> Option<&'static str> {
2023    match (&entity.kind, &entity.data) {
2024        (crate::storage::EntityKind::GraphNode(_), crate::storage::EntityData::Node(_)) => {
2025            Some("node")
2026        }
2027        (crate::storage::EntityKind::GraphEdge(_), crate::storage::EntityData::Edge(_)) => {
2028            Some("edge")
2029        }
2030        (_, crate::storage::EntityData::Row(_)) => Some(public_returning_row_kind(entity)),
2031        _ => None,
2032    }
2033}
2034
2035fn public_returning_row_kind(entity: &crate::storage::UnifiedEntity) -> &'static str {
2036    let Some(row) = entity.data.as_row() else {
2037        return "row";
2038    };
2039
2040    let is_kv = row.named.as_ref().is_some_and(|named| {
2041        (named.len() == 2 && named.contains_key("key") && named.contains_key("value"))
2042            || (named.len() == 1 && (named.contains_key("key") || named.contains_key("value")))
2043    });
2044    if is_kv {
2045        return "kv";
2046    }
2047
2048    let is_document = row
2049        .named
2050        .as_ref()
2051        .is_some_and(|named| named.values().any(runtime_returning_documentish_value))
2052        || row.columns.iter().any(runtime_returning_documentish_value);
2053    if is_document {
2054        "document"
2055    } else {
2056        "row"
2057    }
2058}
2059
2060fn runtime_returning_documentish_value(value: &Value) -> bool {
2061    matches!(value, Value::Json(_) | Value::Blob(_))
2062}
2063
2064fn row_insert_returning_snapshots(
2065    outputs: &[CreateEntityOutput],
2066    fallback: Vec<Vec<(String, Value)>>,
2067) -> Vec<Vec<(String, Value)>> {
2068    outputs
2069        .iter()
2070        .enumerate()
2071        .map(|(idx, out)| {
2072            out.entity
2073                .as_ref()
2074                .map(entity_row_fields_snapshot)
2075                .filter(|snap| !snap.is_empty())
2076                .unwrap_or_else(|| fallback.get(idx).cloned().unwrap_or_default())
2077        })
2078        .collect()
2079}
2080
2081fn graph_insert_returning_snapshots(
2082    store: &crate::storage::unified::UnifiedStore,
2083    collection: &str,
2084    ids: &[EntityId],
2085) -> Vec<Vec<(String, Value)>> {
2086    let Some(manager) = store.get_collection(collection) else {
2087        return Vec::new();
2088    };
2089
2090    ids.iter()
2091        .filter_map(|id| manager.get(*id))
2092        .filter_map(|entity| {
2093            let mut record = runtime_any_record_from_entity_ref(&entity)?;
2094            record.set_arc(sys_key_collection(), Value::text(collection.to_string()));
2095            Some(record)
2096        })
2097        .map(|record| {
2098            record
2099                .iter_fields()
2100                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2101                .collect()
2102        })
2103        .collect()
2104}
2105
2106fn graph_update_returning_snapshots(
2107    runtime: &RedDBRuntime,
2108    collection: &str,
2109    ids: &[EntityId],
2110) -> Vec<Vec<(String, Value)>> {
2111    let store = runtime.db().store();
2112    let Some(manager) = store.get_collection(collection) else {
2113        return Vec::new();
2114    };
2115
2116    manager
2117        .get_many(ids)
2118        .into_iter()
2119        .flatten()
2120        .filter_map(|entity| runtime_any_record_from_entity_ref(&entity))
2121        .map(|record| {
2122            record
2123                .iter_fields()
2124                .map(|(key, value)| (key.as_ref().to_string(), value.clone()))
2125                .collect()
2126        })
2127        .collect()
2128}
2129
2130fn ensure_update_target_contract(
2131    runtime: &RedDBRuntime,
2132    collection: &str,
2133    target: UpdateTarget,
2134) -> RedDBResult<()> {
2135    let Some(contract) = runtime.db().collection_contract(collection) else {
2136        return Ok(());
2137    };
2138    if update_target_contract_is_advisory(&contract)
2139        || update_target_allows_model(contract.declared_model, update_target_model(target))
2140    {
2141        return Ok(());
2142    }
2143    Err(RedDBError::InvalidOperation(format!(
2144        "collection '{}' is declared as '{}' and does not allow '{}' updates",
2145        collection,
2146        update_model_name(contract.declared_model),
2147        update_model_name(update_target_model(target))
2148    )))
2149}
2150
2151fn update_target_contract_is_advisory(contract: &crate::physical::CollectionContract) -> bool {
2152    matches!(
2153        (&contract.origin, &contract.schema_mode),
2154        (
2155            crate::physical::ContractOrigin::Implicit,
2156            crate::catalog::SchemaMode::Dynamic,
2157        )
2158    )
2159}
2160
2161fn update_target_model(target: UpdateTarget) -> crate::catalog::CollectionModel {
2162    match target {
2163        UpdateTarget::Rows => crate::catalog::CollectionModel::Table,
2164        UpdateTarget::Documents => crate::catalog::CollectionModel::Document,
2165        UpdateTarget::Kv => crate::catalog::CollectionModel::Kv,
2166        UpdateTarget::Nodes | UpdateTarget::Edges => crate::catalog::CollectionModel::Graph,
2167    }
2168}
2169
2170fn update_target_allows_model(
2171    declared_model: crate::catalog::CollectionModel,
2172    requested_model: crate::catalog::CollectionModel,
2173) -> bool {
2174    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
2175}
2176
2177fn update_model_name(model: crate::catalog::CollectionModel) -> &'static str {
2178    match model {
2179        crate::catalog::CollectionModel::Table => "table",
2180        crate::catalog::CollectionModel::Document => "document",
2181        crate::catalog::CollectionModel::Graph => "graph",
2182        crate::catalog::CollectionModel::Vector => "vector",
2183        crate::catalog::CollectionModel::Hll => "hll",
2184        crate::catalog::CollectionModel::Sketch => "sketch",
2185        crate::catalog::CollectionModel::Filter => "filter",
2186        crate::catalog::CollectionModel::Kv => "kv",
2187        crate::catalog::CollectionModel::Config => "config",
2188        crate::catalog::CollectionModel::Vault => "vault",
2189        crate::catalog::CollectionModel::Mixed => "mixed",
2190        crate::catalog::CollectionModel::TimeSeries => "timeseries",
2191        crate::catalog::CollectionModel::Queue => "queue",
2192        crate::catalog::CollectionModel::Metrics => "metrics",
2193    }
2194}
2195
2196fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
2197    let db = runtime.db();
2198    if let Some(contract) = db.collection_contract(collection) {
2199        let advisory_implicit_dynamic = matches!(
2200            (&contract.origin, &contract.schema_mode),
2201            (
2202                crate::physical::ContractOrigin::Implicit,
2203                crate::catalog::SchemaMode::Dynamic,
2204            )
2205        );
2206        if advisory_implicit_dynamic
2207            || matches!(
2208                contract.declared_model,
2209                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
2210            )
2211        {
2212            return Ok(());
2213        }
2214        return Err(RedDBError::InvalidOperation(format!(
2215            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
2216            collection, contract.declared_model
2217        )));
2218    }
2219
2220    let now = std::time::SystemTime::now()
2221        .duration_since(std::time::UNIX_EPOCH)
2222        .unwrap_or_default()
2223        .as_millis();
2224    db.save_collection_contract(crate::physical::CollectionContract {
2225        name: collection.to_string(),
2226        declared_model: crate::catalog::CollectionModel::Graph,
2227        schema_mode: crate::catalog::SchemaMode::Dynamic,
2228        origin: crate::physical::ContractOrigin::Implicit,
2229        version: 1,
2230        created_at_unix_ms: now,
2231        updated_at_unix_ms: now,
2232        default_ttl_ms: db.collection_default_ttl_ms(collection),
2233        vector_dimension: None,
2234        vector_metric: None,
2235        context_index_fields: Vec::new(),
2236        declared_columns: Vec::new(),
2237        table_def: None,
2238        timestamps_enabled: false,
2239        context_index_enabled: false,
2240        metrics_raw_retention_ms: None,
2241        metrics_rollup_policies: Vec::new(),
2242        metrics_tenant_identity: None,
2243        metrics_namespace: None,
2244        append_only: false,
2245        subscriptions: Vec::new(),
2246    })
2247    .map(|_| ())
2248    .map_err(|err| RedDBError::Internal(err.to_string()))
2249}
2250
2251fn update_needs_rmw_lock(query: &UpdateQuery) -> bool {
2252    query
2253        .assignment_exprs
2254        .iter()
2255        .enumerate()
2256        .any(|(idx, (column, expr))| {
2257            query
2258                .compound_assignment_ops
2259                .get(idx)
2260                .is_some_and(|op| op.is_some())
2261                || expr_references_update_column(expr, &query.table, column)
2262        })
2263}
2264
2265fn evaluate_compound_update_assignment(
2266    column: &str,
2267    record: &UnifiedRecord,
2268    op: BinOp,
2269    rhs: Value,
2270) -> RedDBResult<Value> {
2271    let lhs = record.get(column).ok_or_else(|| {
2272        RedDBError::Query(format!(
2273            "compound assignment requires existing numeric field '{column}'"
2274        ))
2275    })?;
2276    if matches!(lhs, Value::Null) {
2277        return Err(RedDBError::Query(format!(
2278            "compound assignment requires non-null numeric field '{column}'"
2279        )));
2280    }
2281    apply_compound_numeric_op(column, op, lhs, &rhs)
2282}
2283
2284fn apply_compound_numeric_op(
2285    column: &str,
2286    op: BinOp,
2287    lhs: &Value,
2288    rhs: &Value,
2289) -> RedDBResult<Value> {
2290    let Some(lhs_number) = CompoundNumber::from_value(lhs) else {
2291        return Err(RedDBError::Query(format!(
2292            "compound assignment requires numeric field '{column}'"
2293        )));
2294    };
2295    let Some(rhs_number) = CompoundNumber::from_value(rhs) else {
2296        return Err(RedDBError::Query(format!(
2297            "compound assignment requires numeric right-hand value for field '{column}'"
2298        )));
2299    };
2300
2301    if lhs_number.is_float() || rhs_number.is_float() || matches!(op, BinOp::Div) {
2302        let a = lhs_number.as_f64();
2303        let b = rhs_number.as_f64();
2304        let out = match op {
2305            BinOp::Add => a + b,
2306            BinOp::Sub => a - b,
2307            BinOp::Mul => a * b,
2308            BinOp::Div => {
2309                if b == 0.0 {
2310                    return Err(RedDBError::Query(format!(
2311                        "division by zero in compound assignment for field '{column}'"
2312                    )));
2313                }
2314                a / b
2315            }
2316            BinOp::Mod => {
2317                if b == 0.0 {
2318                    return Err(RedDBError::Query(format!(
2319                        "modulo by zero in compound assignment for field '{column}'"
2320                    )));
2321                }
2322                a % b
2323            }
2324            _ => {
2325                return Err(RedDBError::Query(format!(
2326                    "unsupported compound assignment operator for field '{column}'"
2327                )));
2328            }
2329        };
2330        if !out.is_finite() {
2331            return Err(RedDBError::Query(format!(
2332                "numeric overflow in compound assignment for field '{column}'"
2333            )));
2334        }
2335        return Ok(Value::Float(out));
2336    }
2337
2338    let a = lhs_number.as_i128();
2339    let b = rhs_number.as_i128();
2340    let out = match op {
2341        BinOp::Add => a.checked_add(b),
2342        BinOp::Sub => a.checked_sub(b),
2343        BinOp::Mul => a.checked_mul(b),
2344        BinOp::Mod => {
2345            if b == 0 {
2346                return Err(RedDBError::Query(format!(
2347                    "modulo by zero in compound assignment for field '{column}'"
2348                )));
2349            }
2350            a.checked_rem(b)
2351        }
2352        BinOp::Div => unreachable!("integer division is handled by the float branch"),
2353        _ => None,
2354    }
2355    .ok_or_else(|| {
2356        RedDBError::Query(format!(
2357            "numeric overflow in compound assignment for field '{column}'"
2358        ))
2359    })?;
2360
2361    if matches!(lhs, Value::UnsignedInteger(_)) {
2362        let value = u64::try_from(out).map_err(|_| {
2363            RedDBError::Query(format!(
2364                "numeric overflow in compound assignment for field '{column}'"
2365            ))
2366        })?;
2367        Ok(Value::UnsignedInteger(value))
2368    } else {
2369        let value = i64::try_from(out).map_err(|_| {
2370            RedDBError::Query(format!(
2371                "numeric overflow in compound assignment for field '{column}'"
2372            ))
2373        })?;
2374        Ok(Value::Integer(value))
2375    }
2376}
2377
2378#[derive(Clone, Copy)]
2379enum CompoundNumber {
2380    Integer(i128),
2381    Float(f64),
2382}
2383
2384impl CompoundNumber {
2385    fn from_value(value: &Value) -> Option<Self> {
2386        match value {
2387            Value::Integer(value) | Value::BigInt(value) => Some(Self::Integer(*value as i128)),
2388            Value::UnsignedInteger(value) => Some(Self::Integer(*value as i128)),
2389            Value::Float(value) => value.is_finite().then_some(Self::Float(*value)),
2390            Value::Decimal(value) => Some(Self::Float(*value as f64 / 10_000.0)),
2391            _ => None,
2392        }
2393    }
2394
2395    fn is_float(self) -> bool {
2396        matches!(self, Self::Float(_))
2397    }
2398
2399    fn as_f64(self) -> f64 {
2400        match self {
2401            Self::Integer(value) => value as f64,
2402            Self::Float(value) => value,
2403        }
2404    }
2405
2406    fn as_i128(self) -> i128 {
2407        match self {
2408            Self::Integer(value) => value,
2409            Self::Float(_) => unreachable!("float compound number used as integer"),
2410        }
2411    }
2412}
2413
2414fn expr_references_update_column(expr: &Expr, table_name: &str, target_column: &str) -> bool {
2415    match expr {
2416        Expr::Literal { .. } | Expr::Parameter { .. } | Expr::Subquery { .. } => false,
2417        Expr::Column { field, .. } => {
2418            field_ref_matches_update_column(field, table_name, target_column)
2419        }
2420        Expr::BinaryOp { lhs, rhs, .. } => {
2421            expr_references_update_column(lhs, table_name, target_column)
2422                || expr_references_update_column(rhs, table_name, target_column)
2423        }
2424        Expr::UnaryOp { operand, .. } | Expr::Cast { inner: operand, .. } => {
2425            expr_references_update_column(operand, table_name, target_column)
2426        }
2427        Expr::FunctionCall { args, .. } => args
2428            .iter()
2429            .any(|arg| expr_references_update_column(arg, table_name, target_column)),
2430        Expr::Case {
2431            branches, else_, ..
2432        } => {
2433            branches.iter().any(|(cond, value)| {
2434                expr_references_update_column(cond, table_name, target_column)
2435                    || expr_references_update_column(value, table_name, target_column)
2436            }) || else_
2437                .as_deref()
2438                .is_some_and(|expr| expr_references_update_column(expr, table_name, target_column))
2439        }
2440        Expr::IsNull { operand, .. } => {
2441            expr_references_update_column(operand, table_name, target_column)
2442        }
2443        Expr::InList { target, values, .. } => {
2444            expr_references_update_column(target, table_name, target_column)
2445                || values
2446                    .iter()
2447                    .any(|value| expr_references_update_column(value, table_name, target_column))
2448        }
2449        Expr::Between {
2450            target, low, high, ..
2451        } => {
2452            expr_references_update_column(target, table_name, target_column)
2453                || expr_references_update_column(low, table_name, target_column)
2454                || expr_references_update_column(high, table_name, target_column)
2455        }
2456    }
2457}
2458
2459fn field_ref_matches_update_column(
2460    field: &FieldRef,
2461    table_name: &str,
2462    target_column: &str,
2463) -> bool {
2464    match field {
2465        FieldRef::TableColumn { table, column } => {
2466            column.eq_ignore_ascii_case(target_column)
2467                && (table.is_empty() || table.eq_ignore_ascii_case(table_name))
2468        }
2469        FieldRef::NodeProperty { .. } | FieldRef::EdgeProperty { .. } | FieldRef::NodeId { .. } => {
2470            false
2471        }
2472    }
2473}
2474
2475fn resolve_update_entity_by_logical_id(
2476    runtime: &RedDBRuntime,
2477    table: &str,
2478    logical_id: EntityId,
2479) -> Option<UnifiedEntity> {
2480    let store = runtime.inner.db.store();
2481    crate::runtime::table_row_mvcc_resolver::TableRowMvccReadResolver::current_statement()
2482        .resolve_logical_id(&store, table, logical_id)
2483}
2484
2485fn update_cdc_item_kind(
2486    runtime: &RedDBRuntime,
2487    collection: &str,
2488    entity: &UnifiedEntity,
2489) -> &'static str {
2490    match &entity.data {
2491        EntityData::Node(_) => return "node",
2492        EntityData::Edge(_) => return "edge",
2493        _ => {}
2494    }
2495
2496    match runtime
2497        .db()
2498        .collection_contract(collection)
2499        .map(|contract| contract.declared_model)
2500    {
2501        Some(crate::catalog::CollectionModel::Document) => "document",
2502        Some(crate::catalog::CollectionModel::Kv)
2503        | Some(crate::catalog::CollectionModel::Vault) => "kv",
2504        _ => "row",
2505    }
2506}
2507
2508fn ordered_update_target_ids(
2509    manager: &Arc<crate::storage::SegmentManager>,
2510    entity_ids: &[EntityId],
2511    order_by: &[OrderByClause],
2512    limit: Option<usize>,
2513) -> Vec<EntityId> {
2514    let mut entities: Vec<UnifiedEntity> =
2515        manager.get_many(entity_ids).into_iter().flatten().collect();
2516    entities.sort_by(|left, right| compare_update_order(left, right, order_by));
2517    if let Some(limit) = limit {
2518        entities.truncate(limit);
2519    }
2520    entities.into_iter().map(|entity| entity.id).collect()
2521}
2522
2523fn compare_update_order(
2524    left: &UnifiedEntity,
2525    right: &UnifiedEntity,
2526    order_by: &[OrderByClause],
2527) -> Ordering {
2528    for clause in order_by {
2529        let left_value = update_order_value(left, &clause.field);
2530        let right_value = update_order_value(right, &clause.field);
2531        let ordering = compare_update_order_values(
2532            left_value.as_ref(),
2533            right_value.as_ref(),
2534            clause.nulls_first,
2535        );
2536        if ordering != Ordering::Equal {
2537            return if clause.ascending {
2538                ordering
2539            } else {
2540                ordering.reverse()
2541            };
2542        }
2543    }
2544    left.logical_id().raw().cmp(&right.logical_id().raw())
2545}
2546
2547fn compare_update_order_values(
2548    left: Option<&Value>,
2549    right: Option<&Value>,
2550    nulls_first: bool,
2551) -> Ordering {
2552    match (left, right) {
2553        (None, None) => Ordering::Equal,
2554        (None, Some(_)) => {
2555            if nulls_first {
2556                Ordering::Less
2557            } else {
2558                Ordering::Greater
2559            }
2560        }
2561        (Some(_), None) => {
2562            if nulls_first {
2563                Ordering::Greater
2564            } else {
2565                Ordering::Less
2566            }
2567        }
2568        (Some(left), Some(right)) => {
2569            crate::storage::query::value_compare::total_compare_values(left, right)
2570        }
2571    }
2572}
2573
2574fn update_order_value(entity: &UnifiedEntity, field: &FieldRef) -> Option<Value> {
2575    let FieldRef::TableColumn { table, column } = field else {
2576        return None;
2577    };
2578    if !table.is_empty() {
2579        return None;
2580    }
2581    if column.eq_ignore_ascii_case("rid") {
2582        return Some(Value::UnsignedInteger(entity.logical_id().raw()));
2583    }
2584    match &entity.data {
2585        EntityData::Row(row) => row.get_field(column).cloned(),
2586        EntityData::Node(_) | EntityData::Edge(_) => runtime_any_record_from_entity_ref(entity)
2587            .and_then(|record| record.get(column).cloned()),
2588        _ => None,
2589    }
2590}
2591
2592fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
2593    if columns.is_empty() {
2594        return columns;
2595    }
2596
2597    let mut unique = Vec::with_capacity(columns.len());
2598    for column in columns.drain(..) {
2599        if !unique
2600            .iter()
2601            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
2602        {
2603            unique.push(column);
2604        }
2605    }
2606    unique
2607}
2608
2609// =============================================================================
2610// Helper functions for extracting typed values from column/value pairs
2611// =============================================================================
2612
2613const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
2614
2615fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
2616    if column.eq_ignore_ascii_case("_ttl") {
2617        Some(SQL_TTL_METADATA_COLUMNS[0])
2618    } else if column.eq_ignore_ascii_case("_ttl_ms") {
2619        Some(SQL_TTL_METADATA_COLUMNS[1])
2620    } else if column.eq_ignore_ascii_case("_expires_at") {
2621        Some(SQL_TTL_METADATA_COLUMNS[2])
2622    } else {
2623        None
2624    }
2625}
2626
2627/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
2628/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
2629/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
2630/// `_ttl_ms` and `_expires_at` are passed through.
2631fn canonicalize_sql_ttl_metadata(
2632    key: &'static str,
2633    value: MetadataValue,
2634) -> (&'static str, MetadataValue) {
2635    if key != "_ttl" {
2636        return (key, value);
2637    }
2638    let scaled = match value {
2639        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
2640        MetadataValue::Timestamp(ms_or_s) => {
2641            // Timestamp is already chosen for very large values; treat as
2642            // already-ms to avoid silent overflow.
2643            MetadataValue::Timestamp(ms_or_s)
2644        }
2645        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
2646        other => other,
2647    };
2648    ("_ttl_ms", scaled)
2649}
2650
2651/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
2652/// `SECRET('...')` literals. The runtime strips this marker and
2653/// applies the actual crypto transform during INSERT execution.
2654pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
2655
2656impl RedDBRuntime {
2657    /// Strip the plaintext sentinel from a `Value::Password` or
2658    /// `Value::Secret` produced by the parser and apply the real
2659    /// crypto transform. `Password` is always hashed with argon2id.
2660    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
2661    /// when `red.config.secret.auto_encrypt = true` (default).
2662    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
2663        match value {
2664            Value::Password(marked) => {
2665                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
2666                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
2667                } else {
2668                    Ok(Value::Password(marked))
2669                }
2670            }
2671            Value::Secret(bytes) => {
2672                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
2673                    if !self.secret_auto_encrypt() {
2674                        return Err(RedDBError::Query(
2675                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
2676                             is false. Insert pre-encrypted bytes directly instead."
2677                                .to_string(),
2678                        ));
2679                    }
2680                    let key = self.secret_aes_key().ok_or_else(|| {
2681                        RedDBError::Query(
2682                            "SECRET() column encryption requires a bootstrapped \
2683                             vault (red.secret.aes_key is missing). Start the server \
2684                             with --vault to enable."
2685                                .to_string(),
2686                        )
2687                    })?;
2688                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
2689                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
2690                } else {
2691                    Ok(Value::Secret(bytes))
2692                }
2693            }
2694            other => Ok(other),
2695        }
2696    }
2697}
2698
2699/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
2700/// This is the on-disk representation of `Value::Secret`.
2701fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
2702    let nonce_bytes = crate::auth::store::random_bytes(12);
2703    let mut nonce = [0u8; 12];
2704    nonce.copy_from_slice(&nonce_bytes[..12]);
2705    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
2706    let mut out = Vec::with_capacity(12 + ct.len());
2707    out.extend_from_slice(&nonce);
2708    out.extend_from_slice(&ct);
2709    out
2710}
2711
2712/// Decode a `Value::Secret` payload back to plaintext. Returns
2713/// `None` when the payload is too short or AES-GCM authentication
2714/// fails (tampered or wrong key).
2715pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
2716    if payload.len() < 12 {
2717        return None;
2718    }
2719    let mut nonce = [0u8; 12];
2720    nonce.copy_from_slice(&payload[..12]);
2721    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
2722}
2723
2724fn split_insert_metadata(
2725    runtime: &RedDBRuntime,
2726    columns: &[String],
2727    values: &[Value],
2728) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
2729    let mut fields = Vec::new();
2730    let mut metadata = Vec::new();
2731
2732    for (column, value) in columns.iter().zip(values.iter()) {
2733        // Still support legacy _ttl columns for backward compat
2734        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
2735            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
2736            let (canonical_key, canonical_value) =
2737                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
2738            metadata.push((canonical_key.to_string(), canonical_value));
2739            continue;
2740        }
2741        fields.push((
2742            column.clone(),
2743            runtime.resolve_crypto_sentinel(value.clone())?,
2744        ));
2745    }
2746
2747    Ok((fields, metadata))
2748}
2749
2750/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
2751fn merge_with_clauses(
2752    metadata: &mut Vec<(String, MetadataValue)>,
2753    ttl_ms: Option<u64>,
2754    expires_at_ms: Option<u64>,
2755    with_metadata: &[(String, Value)],
2756) {
2757    if let Some(ms) = ttl_ms {
2758        metadata.push((
2759            "_ttl_ms".to_string(),
2760            if ms <= i64::MAX as u64 {
2761                MetadataValue::Int(ms as i64)
2762            } else {
2763                MetadataValue::Timestamp(ms)
2764            },
2765        ));
2766    }
2767    if let Some(ms) = expires_at_ms {
2768        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
2769    }
2770    for (key, value) in with_metadata {
2771        let meta_value = match value {
2772            Value::Text(s) => MetadataValue::String(s.to_string()),
2773            Value::Integer(n) => MetadataValue::Int(*n),
2774            Value::Float(n) => MetadataValue::Float(*n),
2775            Value::Boolean(b) => MetadataValue::Bool(*b),
2776            _ => MetadataValue::String(value.to_string()),
2777        };
2778        metadata.push((key.clone(), meta_value));
2779    }
2780}
2781
2782fn merge_vector_metadata_column(
2783    metadata: &mut Vec<(String, MetadataValue)>,
2784    columns: &[String],
2785    values: &[Value],
2786) -> RedDBResult<()> {
2787    let Some(value) = columns
2788        .iter()
2789        .position(|column| column.eq_ignore_ascii_case("metadata"))
2790        .map(|index| &values[index])
2791    else {
2792        return Ok(());
2793    };
2794    let json = match value {
2795        Value::Null => return Ok(()),
2796        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
2797            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2798        })?,
2799        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
2800            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2801        })?,
2802        other => {
2803            return Err(RedDBError::Query(format!(
2804                "column 'metadata' expected JSON object, got {other:?}"
2805            )))
2806        }
2807    };
2808    let parsed = metadata_from_json(&json)?;
2809    for (key, value) in parsed.iter() {
2810        metadata.push((key.clone(), value.clone()));
2811    }
2812    Ok(())
2813}
2814
2815fn apply_collection_default_ttl_metadata(
2816    runtime: &RedDBRuntime,
2817    collection: &str,
2818    metadata: &mut Vec<(String, MetadataValue)>,
2819) {
2820    if has_internal_ttl_metadata(metadata) {
2821        return;
2822    }
2823
2824    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
2825        return;
2826    };
2827
2828    metadata.push((
2829        "_ttl_ms".to_string(),
2830        if default_ttl_ms <= i64::MAX as u64 {
2831            MetadataValue::Int(default_ttl_ms as i64)
2832        } else {
2833            MetadataValue::Timestamp(default_ttl_ms)
2834        },
2835    ));
2836}
2837
2838fn ensure_non_tree_reserved_metadata_entries(
2839    metadata: &[(String, MetadataValue)],
2840) -> RedDBResult<()> {
2841    for (key, _) in metadata {
2842        ensure_non_tree_reserved_metadata_key(key)?;
2843    }
2844    Ok(())
2845}
2846
2847fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2848    if key.starts_with(TREE_METADATA_PREFIX) {
2849        return Err(RedDBError::Query(format!(
2850            "metadata key '{}' is reserved for managed trees",
2851            key
2852        )));
2853    }
2854    Ok(())
2855}
2856
2857fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2858    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2859        return Err(RedDBError::Query(format!(
2860            "edge label '{}' is reserved for managed trees",
2861            TREE_CHILD_EDGE_LABEL
2862        )));
2863    }
2864    Ok(())
2865}
2866
2867fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
2868    let mut columns = Vec::with_capacity(pairs.len());
2869    let mut values = Vec::with_capacity(pairs.len());
2870
2871    for (column, value) in pairs {
2872        columns.push(column.clone());
2873        values.push(value.clone());
2874    }
2875
2876    (columns, values)
2877}
2878
2879/// Find a required column value and return it as-is.
2880fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
2881    for (i, col) in columns.iter().enumerate() {
2882        if col.eq_ignore_ascii_case(name) {
2883            return Ok(values[i].clone());
2884        }
2885    }
2886    Err(RedDBError::Query(format!(
2887        "required column '{name}' not found in INSERT"
2888    )))
2889}
2890
2891/// Find a required column value and coerce to String.
2892fn find_column_value_string(
2893    columns: &[String],
2894    values: &[Value],
2895    name: &str,
2896) -> RedDBResult<String> {
2897    let val = find_column_value(columns, values, name)?;
2898    match val {
2899        Value::Text(s) => Ok(s.to_string()),
2900        Value::Integer(n) => Ok(n.to_string()),
2901        Value::Float(n) => Ok(n.to_string()),
2902        other => Err(RedDBError::Query(format!(
2903            "column '{name}' expected text, got {other:?}"
2904        ))),
2905    }
2906}
2907
2908fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
2909    let val = find_column_value(columns, values, name)?;
2910    match val {
2911        Value::Float(n) => Ok(n),
2912        Value::Integer(n) => Ok(n as f64),
2913        Value::UnsignedInteger(n) => Ok(n as f64),
2914        Value::Text(s) => s
2915            .parse::<f64>()
2916            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
2917        other => Err(RedDBError::Query(format!(
2918            "column '{name}' expected number, got {other:?}"
2919        ))),
2920    }
2921}
2922
2923/// Find an optional column value as String.
2924fn find_column_value_opt_string(
2925    columns: &[String],
2926    values: &[Value],
2927    name: &str,
2928) -> Option<String> {
2929    for (i, col) in columns.iter().enumerate() {
2930        if col.eq_ignore_ascii_case(name) {
2931            return match &values[i] {
2932                Value::Null => None,
2933                Value::Text(s) => Some(s.to_string()),
2934                Value::Integer(n) => Some(n.to_string()),
2935                Value::Float(n) => Some(n.to_string()),
2936                _ => None,
2937            };
2938        }
2939    }
2940    None
2941}
2942
2943/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
2944///
2945/// Accepts integer literals, decimal strings, and node labels resolved via
2946/// the per-collection graph label index (same source of truth that
2947/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
2948/// labels error so callers can fall back to the numeric id form.
2949fn resolve_edge_endpoint(
2950    store: &crate::storage::unified::UnifiedStore,
2951    collection: &str,
2952    columns: &[String],
2953    values: &[Value],
2954    name: &str,
2955) -> RedDBResult<u64> {
2956    let val = find_column_value(columns, values, name)?;
2957    match val {
2958        Value::Integer(n) => Ok(n as u64),
2959        Value::UnsignedInteger(n) => Ok(n),
2960        Value::Text(s) => {
2961            if let Ok(n) = s.parse::<u64>() {
2962                return Ok(n);
2963            }
2964            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
2965            match matches.len() {
2966                0 => Err(RedDBError::Query(format!(
2967                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
2968                ))),
2969                1 => Ok(matches[0].raw()),
2970                n => Err(RedDBError::Query(format!(
2971                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
2972                ))),
2973            }
2974        }
2975        other => Err(RedDBError::Query(format!(
2976            "column '{name}' expected integer or node label, got {other:?}"
2977        ))),
2978    }
2979}
2980
2981fn resolve_edge_endpoint_any(
2982    store: &crate::storage::unified::UnifiedStore,
2983    collection: &str,
2984    columns: &[String],
2985    values: &[Value],
2986    names: &[&str],
2987) -> RedDBResult<u64> {
2988    for name in names {
2989        if columns
2990            .iter()
2991            .any(|column| column.eq_ignore_ascii_case(name))
2992        {
2993            return resolve_edge_endpoint(store, collection, columns, values, name);
2994        }
2995    }
2996
2997    Err(RedDBError::Query(format!(
2998        "required column '{}' not found in INSERT",
2999        names.first().copied().unwrap_or("from_rid")
3000    )))
3001}
3002
3003/// Find a required column value and coerce to u64.
3004fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
3005    let val = find_column_value(columns, values, name)?;
3006    match val {
3007        Value::Integer(n) => Ok(n as u64),
3008        Value::UnsignedInteger(n) => Ok(n),
3009        Value::Text(s) => s
3010            .parse::<u64>()
3011            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
3012        other => Err(RedDBError::Query(format!(
3013            "column '{name}' expected integer, got {other:?}"
3014        ))),
3015    }
3016}
3017
3018/// Find an optional column value as f32.
3019fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
3020    for (i, col) in columns.iter().enumerate() {
3021        if col.eq_ignore_ascii_case(name) {
3022            return match &values[i] {
3023                Value::Float(n) => Some(*n as f32),
3024                Value::Integer(n) => Some(*n as f32),
3025                Value::Null => None,
3026                _ => None,
3027            };
3028        }
3029    }
3030    None
3031}
3032
3033/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
3034fn find_column_value_vec_f32(
3035    columns: &[String],
3036    values: &[Value],
3037    name: &str,
3038) -> RedDBResult<Vec<f32>> {
3039    let val = find_column_value(columns, values, name)?;
3040    match val {
3041        Value::Vector(v) => Ok(v),
3042        Value::Json(bytes) => {
3043            // Try to parse as JSON array of numbers
3044            let s = std::str::from_utf8(&bytes).map_err(|_| {
3045                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
3046            })?;
3047            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
3048                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
3049            })?;
3050            Ok(arr)
3051        }
3052        other => Err(RedDBError::Query(format!(
3053            "column '{name}' expected vector, got {other:?}"
3054        ))),
3055    }
3056}
3057
3058fn find_column_value_vec_f32_any(
3059    columns: &[String],
3060    values: &[Value],
3061    names: &[&str],
3062) -> RedDBResult<Vec<f32>> {
3063    for name in names {
3064        if columns
3065            .iter()
3066            .any(|column| column.eq_ignore_ascii_case(name))
3067        {
3068            return find_column_value_vec_f32(columns, values, name);
3069        }
3070    }
3071    Err(RedDBError::Query(format!(
3072        "required vector column '{}' not found in INSERT",
3073        names.join("' or '")
3074    )))
3075}
3076
3077/// Extract remaining properties (all columns not in the exclusion list).
3078fn extract_remaining_properties(
3079    columns: &[String],
3080    values: &[Value],
3081    exclude: &[&str],
3082) -> Vec<(String, Value)> {
3083    columns
3084        .iter()
3085        .zip(values.iter())
3086        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
3087        .map(|(col, val)| (col.clone(), val.clone()))
3088        .collect()
3089}
3090
3091fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
3092    let mut invalid = Vec::new();
3093    for column in columns {
3094        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
3095            invalid.push(column.clone());
3096        }
3097    }
3098
3099    if invalid.is_empty() {
3100        Ok(())
3101    } else {
3102        Err(RedDBError::Query(format!(
3103            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
3104            invalid.join(", ")
3105        )))
3106    }
3107}
3108
3109fn is_timeseries_insert_column(column: &str) -> bool {
3110    matches!(
3111        column.to_ascii_lowercase().as_str(),
3112        "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
3113    )
3114}
3115
3116fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
3117    let mut found = None;
3118
3119    for alias in ["timestamp_ns", "timestamp", "time"] {
3120        for (index, column) in columns.iter().enumerate() {
3121            if !column.eq_ignore_ascii_case(alias) {
3122                continue;
3123            }
3124
3125            if found.is_some() {
3126                return Err(RedDBError::Query(
3127                    "timeseries INSERT accepts only one timestamp column".to_string(),
3128                ));
3129            }
3130
3131            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
3132        }
3133    }
3134
3135    Ok(found)
3136}
3137
3138fn find_timeseries_tags(
3139    columns: &[String],
3140    values: &[Value],
3141) -> RedDBResult<std::collections::HashMap<String, String>> {
3142    for (index, column) in columns.iter().enumerate() {
3143        if column.eq_ignore_ascii_case("tags") {
3144            return parse_timeseries_tags(&values[index]);
3145        }
3146    }
3147    Ok(std::collections::HashMap::new())
3148}
3149
3150fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
3151    match value {
3152        Value::Null => Ok(std::collections::HashMap::new()),
3153        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
3154        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
3155        other => Err(RedDBError::Query(format!(
3156            "timeseries tags must be a JSON object or JSON text, got {other:?}"
3157        ))),
3158    }
3159}
3160
3161fn parse_timeseries_tags_json(
3162    bytes: &[u8],
3163) -> RedDBResult<std::collections::HashMap<String, String>> {
3164    let json: crate::json::Value = crate::json::from_slice(bytes)
3165        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
3166
3167    let object = match json {
3168        crate::json::Value::Object(object) => object,
3169        other => {
3170            return Err(RedDBError::Query(format!(
3171                "timeseries tags must be a JSON object, got {other:?}"
3172            )))
3173        }
3174    };
3175
3176    let mut tags = std::collections::HashMap::with_capacity(object.len());
3177    for (key, value) in object {
3178        tags.insert(key, json_tag_value_to_string(&value));
3179    }
3180    Ok(tags)
3181}
3182
3183fn json_tag_value_to_string(value: &crate::json::Value) -> String {
3184    match value {
3185        crate::json::Value::Null => "null".to_string(),
3186        crate::json::Value::Bool(value) => value.to_string(),
3187        crate::json::Value::Number(value) => value.to_string(),
3188        crate::json::Value::String(value) => value.clone(),
3189        other => other.to_string(),
3190    }
3191}
3192
3193fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
3194    match value {
3195        Value::UnsignedInteger(value) => Ok(*value),
3196        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
3197        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
3198        Value::Text(value) => value.parse::<u64>().map_err(|_| {
3199            RedDBError::Query(format!(
3200                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
3201            ))
3202        }),
3203        other => Err(RedDBError::Query(format!(
3204            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
3205        ))),
3206    }
3207}
3208
3209fn current_unix_ns() -> u64 {
3210    std::time::SystemTime::now()
3211        .duration_since(std::time::UNIX_EPOCH)
3212        .unwrap_or_default()
3213        .as_nanos()
3214        .min(u128::from(u64::MAX)) as u64
3215}
3216
3217fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
3218    use crate::json::{Map, Value as JV};
3219    match value {
3220        MetadataValue::Null => JV::Null,
3221        MetadataValue::Bool(value) => JV::Bool(*value),
3222        MetadataValue::Int(value) => JV::Number(*value as f64),
3223        MetadataValue::Float(value) => JV::Number(*value),
3224        MetadataValue::String(value) => JV::String(value.clone()),
3225        MetadataValue::Bytes(value) => JV::Array(
3226            value
3227                .iter()
3228                .map(|value| JV::Number(*value as f64))
3229                .collect(),
3230        ),
3231        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
3232        MetadataValue::Array(values) => {
3233            JV::Array(values.iter().map(metadata_value_to_json).collect())
3234        }
3235        MetadataValue::Object(object) => {
3236            let entries = object
3237                .iter()
3238                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
3239                .collect();
3240            JV::Object(entries)
3241        }
3242        MetadataValue::Geo { lat, lon } => {
3243            let mut object = Map::new();
3244            object.insert("lat".to_string(), JV::Number(*lat));
3245            object.insert("lon".to_string(), JV::Number(*lon));
3246            JV::Object(object)
3247        }
3248        MetadataValue::Reference(target) => {
3249            let mut object = Map::new();
3250            object.insert(
3251                "collection".to_string(),
3252                JV::String(target.collection().to_string()),
3253            );
3254            object.insert(
3255                "entity_id".to_string(),
3256                JV::Number(target.entity_id().raw() as f64),
3257            );
3258            JV::Object(object)
3259        }
3260        MetadataValue::References(values) => {
3261            let refs = values
3262                .iter()
3263                .map(|target| {
3264                    let mut object = Map::new();
3265                    object.insert(
3266                        "collection".to_string(),
3267                        JV::String(target.collection().to_string()),
3268                    );
3269                    object.insert(
3270                        "entity_id".to_string(),
3271                        JV::Number(target.entity_id().raw() as f64),
3272                    );
3273                    JV::Object(object)
3274                })
3275                .collect();
3276            JV::Array(refs)
3277        }
3278    }
3279}
3280
3281fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
3282    match value {
3283        Value::Null => MetadataValue::Null,
3284        Value::Boolean(value) => MetadataValue::Bool(*value),
3285        Value::Integer(value) => MetadataValue::Int(*value),
3286        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
3287        Value::Float(value) => MetadataValue::Float(*value),
3288        Value::Text(value) => MetadataValue::String(value.to_string()),
3289        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
3290        Value::Timestamp(value) => {
3291            if *value >= 0 {
3292                metadata_u64_to_value(*value as u64)
3293            } else {
3294                MetadataValue::Int(*value)
3295            }
3296        }
3297        Value::TimestampMs(value) => {
3298            if *value >= 0 {
3299                metadata_u64_to_value(*value as u64)
3300            } else {
3301                MetadataValue::Int(*value)
3302            }
3303        }
3304        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
3305        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
3306        Value::Date(value) => MetadataValue::String(value.to_string()),
3307        Value::Time(value) => MetadataValue::String(value.to_string()),
3308        Value::Decimal(value) => MetadataValue::String(value.to_string()),
3309        Value::Ipv4(value) => MetadataValue::String(format!(
3310            "{}.{}.{}.{}",
3311            (value >> 24) & 0xFF,
3312            (value >> 16) & 0xFF,
3313            (value >> 8) & 0xFF,
3314            value & 0xFF
3315        )),
3316        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
3317        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3318        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
3319        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
3320            lat: *lat as f64 / 1_000_000.0,
3321            lon: *lon as f64 / 1_000_000.0,
3322        },
3323        Value::BigInt(value) => MetadataValue::String(value.to_string()),
3324        Value::TableRef(value) => MetadataValue::String(value.clone()),
3325        Value::PageRef(value) => MetadataValue::Int(*value as i64),
3326        Value::Password(value) => MetadataValue::String(value.clone()),
3327        Value::Array(values) => {
3328            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
3329        }
3330        _ => MetadataValue::String(value.to_string()),
3331    }
3332}
3333
3334fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
3335    match value {
3336        Value::Null => Ok(MetadataValue::Null),
3337        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
3338        Value::Integer(_) => Err(RedDBError::Query(format!(
3339            "column '{field}' must be non-negative for TTL metadata"
3340        ))),
3341        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
3342        Value::Float(value) if value.is_finite() => {
3343            if value.fract().abs() >= f64::EPSILON {
3344                return Err(RedDBError::Query(format!(
3345                    "column '{field}' must be an integer (TTL metadata must be an integer)"
3346                )));
3347            }
3348            if *value < 0.0 {
3349                return Err(RedDBError::Query(format!(
3350                    "column '{field}' must be non-negative for TTL metadata"
3351                )));
3352            }
3353            if *value > u64::MAX as f64 {
3354                return Err(RedDBError::Query(format!(
3355                    "column '{field}' value is too large"
3356                )));
3357            }
3358            Ok(metadata_u64_to_value(*value as u64))
3359        }
3360        Value::Float(_) => Err(RedDBError::Query(format!(
3361            "column '{field}' must be a finite number"
3362        ))),
3363        Value::Text(value) => {
3364            let value = value.trim();
3365            if let Ok(value) = value.parse::<u64>() {
3366                Ok(metadata_u64_to_value(value))
3367            } else if let Ok(value) = value.parse::<i64>() {
3368                if value < 0 {
3369                    return Err(RedDBError::Query(format!(
3370                        "column '{field}' must be non-negative for TTL metadata"
3371                    )));
3372                }
3373                Ok(metadata_u64_to_value(value as u64))
3374            } else if let Ok(value) = value.parse::<f64>() {
3375                if !value.is_finite() {
3376                    return Err(RedDBError::Query(format!(
3377                        "column '{field}' must be a finite number"
3378                    )));
3379                }
3380                if value.fract().abs() >= f64::EPSILON {
3381                    return Err(RedDBError::Query(format!(
3382                        "column '{field}' must be an integer (TTL metadata must be an integer)"
3383                    )));
3384                }
3385                if value < 0.0 {
3386                    return Err(RedDBError::Query(format!(
3387                        "column '{field}' must be non-negative for TTL metadata"
3388                    )));
3389                }
3390                if value > u64::MAX as f64 {
3391                    return Err(RedDBError::Query(format!(
3392                        "column '{field}' value is too large"
3393                    )));
3394                }
3395                Ok(metadata_u64_to_value(value as u64))
3396            } else {
3397                Err(RedDBError::Query(format!(
3398                    "column '{field}' expects a numeric value for TTL metadata"
3399                )))
3400            }
3401        }
3402        _ => Err(RedDBError::Query(format!(
3403            "column '{field}' expects a numeric value for TTL metadata"
3404        ))),
3405    }
3406}
3407
3408fn metadata_u64_to_value(value: u64) -> MetadataValue {
3409    if value <= i64::MAX as u64 {
3410        MetadataValue::Int(value as i64)
3411    } else {
3412        MetadataValue::Timestamp(value)
3413    }
3414}
3415
3416/// Phase 2 PG parity: inspect a column value and return `true` when
3417/// the dotted `tail` path is already present under it. Used by the
3418/// tenant auto-fill so rows that already carry an explicit value
3419/// (bulk import, admin insert on behalf of a tenant) are not
3420/// double-stamped with the session's current_tenant().
3421fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
3422    let json = match value {
3423        Value::Null => return false,
3424        Value::Json(bytes) | Value::Blob(bytes) => {
3425            match crate::json::from_slice::<crate::json::Value>(bytes) {
3426                Ok(v) => v,
3427                Err(_) => return false,
3428            }
3429        }
3430        Value::Text(s) => {
3431            let trimmed = s.trim_start();
3432            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
3433                return false;
3434            }
3435            match crate::json::from_str::<crate::json::Value>(s) {
3436                Ok(v) => v,
3437                Err(_) => return false,
3438            }
3439        }
3440        _ => return false,
3441    };
3442    let mut cursor = &json;
3443    for seg in tail.split('.') {
3444        match cursor {
3445            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
3446                Some((_, v)) => cursor = v,
3447                None => return false,
3448            },
3449            _ => return false,
3450        }
3451    }
3452    !matches!(cursor, crate::json::Value::Null)
3453}
3454
3455/// Phase 2 PG parity: take a column value (possibly Null / Text /
3456/// Json) and return a `Value::Json` with the dotted `tail` path set
3457/// to `tenant_id`. Preserves every pre-existing key.
3458///
3459/// Accepts:
3460/// * `Value::Null`  → fresh `{tail: tenant_id}` object
3461/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
3462/// * `Value::text(s)` if `s` is valid JSON → same as Json
3463/// * anything else → error (user supplied a scalar where we need
3464///   a JSON container)
3465fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
3466    let mut root = match current {
3467        Value::Null => crate::json::Value::Object(Default::default()),
3468        Value::Json(bytes) | Value::Blob(bytes) => {
3469            crate::json::from_slice(&bytes).map_err(|err| {
3470                RedDBError::Query(format!(
3471                    "tenant auto-fill: root column is not valid JSON ({err})"
3472                ))
3473            })?
3474        }
3475        Value::Text(s) => {
3476            if s.trim().is_empty() {
3477                crate::json::Value::Object(Default::default())
3478            } else {
3479                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
3480                    RedDBError::Query(format!(
3481                        "tenant auto-fill: text root is not valid JSON ({err})"
3482                    ))
3483                })?
3484            }
3485        }
3486        other => {
3487            return Err(RedDBError::Query(format!(
3488                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
3489            )));
3490        }
3491    };
3492
3493    // Navigate path segments, creating intermediate objects on demand.
3494    let segments: Vec<&str> = tail.split('.').collect();
3495    let mut cursor: &mut crate::json::Value = &mut root;
3496    for (i, seg) in segments.iter().enumerate() {
3497        let is_last = i + 1 == segments.len();
3498        let map = match cursor {
3499            crate::json::Value::Object(m) => m,
3500            _ => {
3501                return Err(RedDBError::Query(format!(
3502                    "tenant auto-fill: segment '{seg}' is not inside an object"
3503                )));
3504            }
3505        };
3506        if is_last {
3507            map.insert(
3508                seg.to_string(),
3509                crate::json::Value::String(tenant_id.to_string()),
3510            );
3511            break;
3512        }
3513        cursor = map
3514            .entry(seg.to_string())
3515            .or_insert_with(|| crate::json::Value::Object(Default::default()));
3516    }
3517
3518    let bytes = crate::json::to_vec(&root).map_err(|err| {
3519        RedDBError::Query(format!(
3520            "tenant auto-fill: failed to re-serialize JSON ({err})"
3521        ))
3522    })?;
3523    Ok(Value::Json(bytes))
3524}
3525
3526#[cfg(test)]
3527mod tests {
3528    use crate::storage::schema::Value;
3529    use crate::storage::wal::{WalReader, WalRecord};
3530    use crate::{RedDBOptions, RedDBRuntime};
3531    use std::path::Path;
3532
3533    fn store_commit_batches(wal_path: &Path) -> Vec<Vec<Vec<u8>>> {
3534        WalReader::open(wal_path)
3535            .expect("wal opens")
3536            .iter()
3537            .map(|record| record.expect("wal record decodes").1)
3538            .filter_map(|record| match record {
3539                WalRecord::TxCommitBatch { actions, .. } => Some(actions),
3540                _ => None,
3541            })
3542            .collect()
3543    }
3544
3545    fn action_contains_text(action: &[u8], needle: &str) -> bool {
3546        action
3547            .windows(needle.len())
3548            .any(|window| window == needle.as_bytes())
3549    }
3550
3551    fn assert_statement_writes_collections_in_one_new_wal_batch(
3552        rt: &RedDBRuntime,
3553        wal_path: &Path,
3554        statement: &str,
3555        source: &str,
3556        event_queue: &str,
3557    ) {
3558        let before_batches = store_commit_batches(wal_path).len();
3559
3560        rt.execute_query(statement).unwrap();
3561
3562        let batches = store_commit_batches(wal_path);
3563        let statement_batches = &batches[before_batches..];
3564        let source_batch = statement_batches
3565            .iter()
3566            .position(|actions| {
3567                actions.iter().any(|action| {
3568                    action_contains_text(action, source)
3569                        && !action_contains_text(action, event_queue)
3570                })
3571            })
3572            .expect("source collection write batch is present");
3573        let event_batch = statement_batches
3574            .iter()
3575            .position(|actions| {
3576                actions
3577                    .iter()
3578                    .any(|action| action_contains_text(action, event_queue))
3579            })
3580            .expect("event queue write batch is present");
3581
3582        assert_eq!(
3583            source_batch, event_batch,
3584            "WITH EVENTS must persist the source write and queue event in the same WAL batch"
3585        );
3586    }
3587
3588    #[test]
3589    fn with_events_autocommit_persists_mutation_and_event_in_one_wal_batch() {
3590        let dir = tempfile::tempdir().unwrap();
3591        let db_path = dir.path().join("events_dual_write.rdb");
3592        let wal_path = db_path.with_extension("rdb-uwal");
3593        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3594
3595        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3596            .unwrap();
3597        assert_statement_writes_collections_in_one_new_wal_batch(
3598            &rt,
3599            &wal_path,
3600            "INSERT INTO users (id, email) VALUES (1, 'a@example.test')",
3601            "users",
3602            "users_events",
3603        );
3604    }
3605
3606    #[test]
3607    fn with_events_autocommit_update_persists_mutation_and_event_in_one_wal_batch() {
3608        let dir = tempfile::tempdir().unwrap();
3609        let db_path = dir.path().join("events_update_atomic.rdb");
3610        let wal_path = db_path.with_extension("rdb-uwal");
3611        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3612
3613        rt.execute_query(
3614            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (UPDATE) TO user_updates",
3615        )
3616        .unwrap();
3617        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
3618            .unwrap();
3619
3620        assert_statement_writes_collections_in_one_new_wal_batch(
3621            &rt,
3622            &wal_path,
3623            "UPDATE users SET email = 'b@example.test' WHERE id = 1",
3624            "users",
3625            "user_updates",
3626        );
3627    }
3628
3629    #[test]
3630    fn with_events_autocommit_delete_persists_mutation_and_event_in_one_wal_batch() {
3631        let dir = tempfile::tempdir().unwrap();
3632        let db_path = dir.path().join("events_delete_atomic.rdb");
3633        let wal_path = db_path.with_extension("rdb-uwal");
3634        let rt = RedDBRuntime::with_options(RedDBOptions::persistent(&db_path)).unwrap();
3635
3636        rt.execute_query(
3637            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (DELETE) TO user_deletes",
3638        )
3639        .unwrap();
3640        rt.execute_query("INSERT INTO users (id, email) VALUES (1, 'a@example.test')")
3641            .unwrap();
3642
3643        assert_statement_writes_collections_in_one_new_wal_batch(
3644            &rt,
3645            &wal_path,
3646            "DELETE FROM users WHERE id = 1",
3647            "users",
3648            "user_deletes",
3649        );
3650    }
3651
3652    #[test]
3653    fn update_where_id_in_with_hash_index_updates_expected_rows() {
3654        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3655        rt.execute_query("CREATE TABLE users (id INT, score INT)")
3656            .unwrap();
3657        for id in 0..5 {
3658            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
3659                .unwrap();
3660        }
3661        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
3662            .unwrap();
3663
3664        let updated = rt
3665            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
3666            .unwrap();
3667        assert_eq!(updated.affected_rows, 3);
3668
3669        let selected = rt
3670            .execute_query("SELECT id, score FROM users ORDER BY id")
3671            .unwrap();
3672        let scores: Vec<(i64, i64)> = selected
3673            .result
3674            .records
3675            .iter()
3676            .map(|record| {
3677                let id = match record.get("id").unwrap() {
3678                    Value::Integer(value) => *value,
3679                    other => panic!("expected integer id, got {other:?}"),
3680                };
3681                let score = match record.get("score").unwrap() {
3682                    Value::Integer(value) => *value,
3683                    other => panic!("expected integer score, got {other:?}"),
3684                };
3685                (id, score)
3686            })
3687            .collect();
3688        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
3689    }
3690
3691    /// Drives UPDATE through the shared `DmlTargetScan` module — the
3692    /// same code path DELETE uses (#51, #52). Exercises the indexed
3693    /// equality fast-path (WHERE id = N with a HASH index), the
3694    /// unindexed range scan (WHERE score > N), and the no-WHERE
3695    /// full-scan branch to confirm the extracted "find target rows"
3696    /// loop preserves affected-row counts and the resulting row state.
3697    #[test]
3698    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
3699        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3700        rt.execute_query("CREATE TABLE items (id INT, score INT)")
3701            .unwrap();
3702        for id in 0..5 {
3703            rt.execute_query(&format!(
3704                "INSERT INTO items (id, score) VALUES ({id}, {})",
3705                id * 10
3706            ))
3707            .unwrap();
3708        }
3709        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
3710            .unwrap();
3711
3712        // Indexed equality UPDATE — hits the hash fast-path inside
3713        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
3714        // below the score>25 cutoff so the next assertion stays clean.
3715        let updated_one = rt
3716            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
3717            .unwrap();
3718        assert_eq!(updated_one.affected_rows, 1);
3719
3720        // Unindexed scan UPDATE — bumps everyone with score > 25,
3721        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
3722        // zoned/full-scan branch.
3723        let updated_many = rt
3724            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
3725            .unwrap();
3726        assert_eq!(updated_many.affected_rows, 2);
3727
3728        let snapshot = rt
3729            .execute_query("SELECT id, score FROM items ORDER BY id")
3730            .unwrap();
3731        let pairs: Vec<(i64, i64)> = snapshot
3732            .result
3733            .records
3734            .iter()
3735            .map(|record| {
3736                let id = match record.get("id").unwrap() {
3737                    Value::Integer(value) => *value,
3738                    other => panic!("expected integer id, got {other:?}"),
3739                };
3740                let score = match record.get("score").unwrap() {
3741                    Value::Integer(value) => *value,
3742                    other => panic!("expected integer score, got {other:?}"),
3743                };
3744                (id, score)
3745            })
3746            .collect();
3747        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
3748
3749        // Full-scan UPDATE with no WHERE rewrites every remaining row.
3750        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
3751        assert_eq!(updated_all.affected_rows, 5);
3752        let after = rt
3753            .execute_query("SELECT score FROM items ORDER BY id")
3754            .unwrap();
3755        let scores: Vec<i64> = after
3756            .result
3757            .records
3758            .iter()
3759            .map(|record| match record.get("score").unwrap() {
3760                Value::Integer(value) => *value,
3761                other => panic!("expected integer score, got {other:?}"),
3762            })
3763            .collect();
3764        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
3765    }
3766
3767    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
3768    /// both the index fast-path (WHERE id = N with a HASH index) and
3769    /// the unindexed scan path (WHERE score > N) to confirm the
3770    /// extracted "find target rows" loop preserves the affected-row
3771    /// count and which rows survive.
3772    #[test]
3773    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
3774        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3775        rt.execute_query("CREATE TABLE items (id INT, score INT)")
3776            .unwrap();
3777        for id in 0..5 {
3778            rt.execute_query(&format!(
3779                "INSERT INTO items (id, score) VALUES ({id}, {})",
3780                id * 10
3781            ))
3782            .unwrap();
3783        }
3784        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
3785            .unwrap();
3786
3787        // Indexed equality DELETE — hits the hash fast-path inside
3788        // DmlTargetScan::find_target_ids.
3789        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
3790        assert_eq!(deleted_one.affected_rows, 1);
3791
3792        // Unindexed scan DELETE — drops everyone with score > 25,
3793        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
3794        // zoned/full-scan branch.
3795        let deleted_many = rt
3796            .execute_query("DELETE FROM items WHERE score > 25")
3797            .unwrap();
3798        assert_eq!(deleted_many.affected_rows, 2);
3799
3800        let surviving = rt
3801            .execute_query("SELECT id FROM items ORDER BY id")
3802            .unwrap();
3803        let ids: Vec<i64> = surviving
3804            .result
3805            .records
3806            .iter()
3807            .map(|record| match record.get("id").unwrap() {
3808                Value::Integer(value) => *value,
3809                other => panic!("expected integer id, got {other:?}"),
3810            })
3811            .collect();
3812        assert_eq!(ids, vec![0, 1]);
3813
3814        // Sanity: full-scan DELETE with no WHERE clears the rest.
3815        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
3816        assert_eq!(deleted_rest.affected_rows, 2);
3817        let empty = rt.execute_query("SELECT id FROM items").unwrap();
3818        assert!(empty.result.records.is_empty());
3819    }
3820
3821    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
3822    /// INSERT but reject UPDATE and DELETE with the documented
3823    /// operator-facing error strings. Drives all three DML verbs so
3824    /// the centralized gate is exercised end-to-end.
3825    #[test]
3826    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
3827        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3828        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
3829            .unwrap();
3830
3831        // INSERT must succeed — APPEND ONLY exists precisely to allow
3832        // appends. The gate should be a no-op for INSERT.
3833        let inserted = rt
3834            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
3835            .unwrap();
3836        assert_eq!(inserted.affected_rows, 1);
3837
3838        // UPDATE is rejected with the gate's UPDATE-specific message.
3839        let update_err = rt
3840            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
3841            .unwrap_err();
3842        let msg = format!("{update_err}");
3843        assert!(
3844            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
3845            "expected UPDATE rejection message, got: {msg}"
3846        );
3847
3848        // DELETE is rejected with the gate's DELETE-specific message.
3849        let delete_err = rt
3850            .execute_query("DELETE FROM events WHERE id = 1")
3851            .unwrap_err();
3852        let msg = format!("{delete_err}");
3853        assert!(
3854            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
3855            "expected DELETE rejection message, got: {msg}"
3856        );
3857
3858        // Row should still be present — neither rejected mutation
3859        // touched storage.
3860        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
3861        assert_eq!(surviving.result.records.len(), 1);
3862    }
3863
3864    /// CollectionContract gate: tables without an APPEND ONLY contract
3865    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
3866    /// is a true pass-through, not an accidental block.
3867    #[test]
3868    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
3869        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3870        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
3871            .unwrap();
3872
3873        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
3874            .unwrap();
3875        let updated = rt
3876            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
3877            .unwrap();
3878        assert_eq!(updated.affected_rows, 1);
3879        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
3880        assert_eq!(deleted.affected_rows, 1);
3881    }
3882
3883    #[test]
3884    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
3885        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3886        rt.execute_query(
3887            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
3888        )
3889        .unwrap();
3890
3891        let inserted = rt
3892            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
3893            .unwrap();
3894        assert_eq!(inserted.affected_rows, 1);
3895
3896        let events = queue_payloads(&rt, "audit_log");
3897        assert_eq!(events.len(), 1);
3898        let event = events[0].as_object().expect("event payload object");
3899        assert!(event
3900            .get("event_id")
3901            .and_then(crate::json::Value::as_str)
3902            .is_some_and(|value| !value.is_empty()));
3903        assert_eq!(
3904            event.get("op").and_then(crate::json::Value::as_str),
3905            Some("insert")
3906        );
3907        assert_eq!(
3908            event.get("collection").and_then(crate::json::Value::as_str),
3909            Some("users")
3910        );
3911        assert_eq!(
3912            event.get("id").and_then(crate::json::Value::as_u64),
3913            Some(7)
3914        );
3915        assert!(event
3916            .get("ts")
3917            .and_then(crate::json::Value::as_u64)
3918            .is_some());
3919        assert!(event
3920            .get("lsn")
3921            .and_then(crate::json::Value::as_u64)
3922            .is_some());
3923        assert!(matches!(
3924            event.get("tenant"),
3925            Some(crate::json::Value::Null)
3926        ));
3927        assert!(matches!(
3928            event.get("before"),
3929            Some(crate::json::Value::Null)
3930        ));
3931        let after = event
3932            .get("after")
3933            .and_then(crate::json::Value::as_object)
3934            .expect("after object");
3935        assert_eq!(
3936            after.get("id").and_then(crate::json::Value::as_u64),
3937            Some(7)
3938        );
3939        assert_eq!(
3940            after.get("email").and_then(crate::json::Value::as_str),
3941            Some("a@example.com")
3942        );
3943    }
3944
3945    #[test]
3946    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
3947        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3948        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3949            .unwrap();
3950
3951        rt.execute_query(
3952            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
3953        )
3954        .unwrap();
3955
3956        let events = queue_payloads(&rt, "users_events");
3957        assert_eq!(events.len(), 2);
3958        let mut previous_lsn = 0;
3959        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
3960            let object = event.as_object().expect("event payload object");
3961            assert_eq!(
3962                object.get("op").and_then(crate::json::Value::as_str),
3963                Some("insert")
3964            );
3965            assert_eq!(
3966                object.get("id").and_then(crate::json::Value::as_u64),
3967                Some(expected_id)
3968            );
3969            let lsn = object
3970                .get("lsn")
3971                .and_then(crate::json::Value::as_u64)
3972                .expect("event lsn");
3973            assert!(
3974                lsn > previous_lsn,
3975                "event LSNs should increase in row order"
3976            );
3977            previous_lsn = lsn;
3978            let after = object
3979                .get("after")
3980                .and_then(crate::json::Value::as_object)
3981                .expect("after object");
3982            assert_eq!(
3983                after.get("id").and_then(crate::json::Value::as_u64),
3984                Some(expected_id)
3985            );
3986        }
3987    }
3988
3989    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
3990        let result = rt
3991            .execute_query(&format!("QUEUE PEEK {queue} 10"))
3992            .expect("peek queue");
3993        result
3994            .result
3995            .records
3996            .iter()
3997            .map(
3998                |record| match record.get("payload").expect("payload column") {
3999                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
4000                    other => panic!("expected JSON queue payload, got {other:?}"),
4001                },
4002            )
4003            .collect()
4004    }
4005
4006    // ── #112: auto-index user `id` on first insert ─────────────────────
4007
4008    /// First insert into a fresh collection that carries a column named
4009    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
4010    /// populate it transparently, and `WHERE id = N` lookups exercise
4011    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
4012    ///
4013    /// This is the load-bearing acceptance test for #112 — without the
4014    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
4015    /// fall through to a full segment scan (the 4× perf gap documented
4016    /// in `docs/perf/delete-sequential-2026-05-06.md`).
4017    #[test]
4018    fn auto_index_id_fires_on_first_insert() {
4019        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4020        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
4021            .unwrap();
4022
4023        // Pre-condition: no index on `id` yet.
4024        assert!(
4025            rt.index_store_ref()
4026                .find_index_for_column("bench_users", "id")
4027                .is_none(),
4028            "freshly created collection should not have an `id` index"
4029        );
4030
4031        // Single-row INSERT — drives `MutationEngine::append_one`.
4032        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
4033            .unwrap();
4034
4035        // Post-condition: hash index registered on `id`.
4036        let registered = rt
4037            .index_store_ref()
4038            .find_index_for_column("bench_users", "id")
4039            .expect("auto-index hook should have registered idx_id on first insert");
4040        assert_eq!(registered.name, "idx_id");
4041        assert_eq!(registered.collection, "bench_users");
4042        assert_eq!(registered.columns, vec!["id".to_string()]);
4043        assert!(matches!(
4044            registered.method,
4045            super::super::index_store::IndexMethodKind::Hash
4046        ));
4047
4048        // Subsequent inserts populate the index; `WHERE id = N` should
4049        // resolve via the hash fast path and round-trip every row.
4050        for id in 2..=5 {
4051            rt.execute_query(&format!(
4052                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
4053                id * 10
4054            ))
4055            .unwrap();
4056        }
4057        for id in 1..=5 {
4058            let result = rt
4059                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
4060                .unwrap();
4061            assert_eq!(
4062                result.result.records.len(),
4063                1,
4064                "id={id} should match one row"
4065            );
4066        }
4067
4068        // Delete via the hash fast-path — exactly the bench scenario the
4069        // perf doc identified as the 4× regression. With the index
4070        // present, `find_target_ids` short-circuits before
4071        // `for_each_entity_zoned` runs.
4072        let deleted = rt
4073            .execute_query("DELETE FROM bench_users WHERE id = 3")
4074            .unwrap();
4075        assert_eq!(deleted.affected_rows, 1);
4076    }
4077
4078    /// Bulk INSERT (the multi-row VALUES path) drives
4079    /// `MutationEngine::append_batch`. The hook must fire there too —
4080    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
4081    /// wire bulk INSERT) skip auto-indexing entirely.
4082    #[test]
4083    fn auto_index_id_fires_on_first_bulk_insert() {
4084        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4085        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
4086            .unwrap();
4087
4088        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
4089            .unwrap();
4090
4091        let registered = rt
4092            .index_store_ref()
4093            .find_index_for_column("bench_bulk", "id")
4094            .expect("auto-index hook should fire on first bulk insert");
4095        assert_eq!(registered.name, "idx_id");
4096
4097        // Every row populated via `index_entity_insert_batch`.
4098        for id in 1..=3 {
4099            let result = rt
4100                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
4101                .unwrap();
4102            assert_eq!(result.result.records.len(), 1);
4103        }
4104    }
4105
4106    /// Hook is a no-op when the row carries no `id` column. Conservative
4107    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
4108    /// don't trigger it.
4109    #[test]
4110    fn auto_index_id_skips_when_no_id_column() {
4111        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4112        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
4113            .unwrap();
4114        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
4115            .unwrap();
4116
4117        assert!(rt
4118            .index_store_ref()
4119            .find_index_for_column("plain", "id")
4120            .is_none());
4121        assert!(rt
4122            .index_store_ref()
4123            .find_index_for_column("plain", "uid")
4124            .is_none());
4125    }
4126
4127    /// Hook only fires once per collection. If an explicit
4128    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
4129    /// detects it via `find_index_for_column` and does NOT clobber it
4130    /// with a HASH index on the next insert.
4131    #[test]
4132    fn auto_index_id_skips_when_index_already_exists() {
4133        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4134        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
4135            .unwrap();
4136        // User-declared BTREE index on `id` before any insert.
4137        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
4138            .unwrap();
4139        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
4140            .unwrap();
4141
4142        let registered = rt
4143            .index_store_ref()
4144            .find_index_for_column("pre", "id")
4145            .expect("user index should still be there");
4146        assert_eq!(
4147            registered.name, "user_idx",
4148            "auto-index hook must not overwrite an existing index"
4149        );
4150    }
4151
4152    /// Implicit `idx_id` is reaped when the collection drops. The
4153    /// existing `execute_drop_table` walks `list_indices` and drops every
4154    /// entry — confirm the auto-created index participates.
4155    #[test]
4156    fn auto_index_id_dropped_with_collection() {
4157        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4158        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
4159            .unwrap();
4160        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
4161            .unwrap();
4162        assert!(rt
4163            .index_store_ref()
4164            .find_index_for_column("ephemeral", "id")
4165            .is_some());
4166
4167        rt.execute_query("DROP TABLE ephemeral").unwrap();
4168
4169        assert!(
4170            rt.index_store_ref()
4171                .find_index_for_column("ephemeral", "id")
4172                .is_none(),
4173            "implicit `idx_id` must be reaped when its collection drops"
4174        );
4175    }
4176
4177    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
4178    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
4179    /// off, first insert leaves the collection without an `id` index —
4180    /// DELETE/UPDATE fall back to the scan path.
4181    #[test]
4182    fn auto_index_id_disabled_by_config() {
4183        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
4184        let rt = RedDBRuntime::with_options(opts).unwrap();
4185
4186        rt.execute_query("CREATE TABLE off (id INT, score INT)")
4187            .unwrap();
4188        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
4189            .unwrap();
4190
4191        assert!(
4192            rt.index_store_ref()
4193                .find_index_for_column("off", "id")
4194                .is_none(),
4195            "with auto_index_id=false, no implicit index should be created"
4196        );
4197    }
4198
4199    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
4200
4201    #[test]
4202    fn update_single_row_emits_update_event() {
4203        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4204        rt.execute_query(
4205            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
4206        )
4207        .unwrap();
4208        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4209            .unwrap();
4210
4211        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4212            .unwrap();
4213
4214        let events = queue_payloads(&rt, "audit_log");
4215        assert_eq!(events.len(), 1, "expected exactly 1 update event");
4216        let event = events[0].as_object().expect("event payload object");
4217        assert_eq!(
4218            event.get("op").and_then(crate::json::Value::as_str),
4219            Some("update")
4220        );
4221        assert_eq!(
4222            event.get("collection").and_then(crate::json::Value::as_str),
4223            Some("users")
4224        );
4225        assert!(event
4226            .get("event_id")
4227            .and_then(crate::json::Value::as_str)
4228            .is_some_and(|v| !v.is_empty()));
4229        let before = event
4230            .get("before")
4231            .and_then(crate::json::Value::as_object)
4232            .expect("before must be an object");
4233        let after = event
4234            .get("after")
4235            .and_then(crate::json::Value::as_object)
4236            .expect("after must be an object");
4237        assert_eq!(
4238            before.get("name").and_then(crate::json::Value::as_str),
4239            Some("Alice"),
4240            "before.name should be the old value"
4241        );
4242        assert_eq!(
4243            after.get("name").and_then(crate::json::Value::as_str),
4244            Some("Bob"),
4245            "after.name should be the new value"
4246        );
4247    }
4248
4249    #[test]
4250    fn update_event_only_includes_changed_fields() {
4251        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4252        rt.execute_query(
4253            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
4254        )
4255        .unwrap();
4256        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
4257            .unwrap();
4258
4259        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
4260            .unwrap();
4261
4262        let events = queue_payloads(&rt, "evts");
4263        assert_eq!(events.len(), 1);
4264        let event = events[0].as_object().unwrap();
4265        let before = event
4266            .get("before")
4267            .and_then(crate::json::Value::as_object)
4268            .unwrap();
4269        let after = event
4270            .get("after")
4271            .and_then(crate::json::Value::as_object)
4272            .unwrap();
4273        // Only changed field included.
4274        assert!(
4275            before.contains_key("name"),
4276            "before must include changed field"
4277        );
4278        assert!(
4279            after.contains_key("name"),
4280            "after must include changed field"
4281        );
4282        // Unchanged fields must not appear.
4283        assert!(
4284            !before.contains_key("email"),
4285            "before must not include unchanged email"
4286        );
4287        assert!(
4288            !after.contains_key("email"),
4289            "after must not include unchanged email"
4290        );
4291    }
4292
4293    #[test]
4294    fn multi_row_update_emits_one_event_per_row() {
4295        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4296        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
4297            .unwrap();
4298        rt.execute_query(
4299            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
4300        )
4301        .unwrap();
4302
4303        rt.execute_query("UPDATE items SET status = 'done'")
4304            .unwrap();
4305
4306        let events = queue_payloads(&rt, "evts");
4307        assert_eq!(events.len(), 3, "expected one update event per row");
4308        for event in &events {
4309            let obj = event.as_object().unwrap();
4310            assert_eq!(
4311                obj.get("op").and_then(crate::json::Value::as_str),
4312                Some("update")
4313            );
4314        }
4315    }
4316
4317    #[test]
4318    fn delete_single_row_emits_delete_event() {
4319        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4320        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
4321            .unwrap();
4322        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
4323            .unwrap();
4324
4325        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
4326
4327        let events = queue_payloads(&rt, "del_log");
4328        assert_eq!(events.len(), 1);
4329        let event = events[0].as_object().expect("event payload object");
4330        assert_eq!(
4331            event.get("op").and_then(crate::json::Value::as_str),
4332            Some("delete")
4333        );
4334        assert_eq!(
4335            event.get("collection").and_then(crate::json::Value::as_str),
4336            Some("users")
4337        );
4338        assert!(event
4339            .get("event_id")
4340            .and_then(crate::json::Value::as_str)
4341            .is_some_and(|v| !v.is_empty()));
4342        let before = event
4343            .get("before")
4344            .and_then(crate::json::Value::as_object)
4345            .expect("before must be an object for delete");
4346        assert_eq!(
4347            before.get("id").and_then(crate::json::Value::as_u64),
4348            Some(42)
4349        );
4350        assert_eq!(
4351            before.get("name").and_then(crate::json::Value::as_str),
4352            Some("Alice")
4353        );
4354        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
4355    }
4356
4357    #[test]
4358    fn multi_row_delete_emits_one_event_per_row() {
4359        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4360        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
4361            .unwrap();
4362        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
4363            .unwrap();
4364
4365        rt.execute_query("DELETE FROM items").unwrap();
4366
4367        let events = queue_payloads(&rt, "del_log");
4368        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
4369        for event in &events {
4370            let obj = event.as_object().unwrap();
4371            assert_eq!(
4372                obj.get("op").and_then(crate::json::Value::as_str),
4373                Some("delete")
4374            );
4375            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
4376        }
4377    }
4378
4379    #[test]
4380    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
4381        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4382        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
4383            .unwrap();
4384
4385        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4386            .unwrap();
4387        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
4388
4389        let events = queue_payloads(&rt, "evts");
4390        assert!(
4391            events.is_empty(),
4392            "UPDATE-only filter must not emit INSERT or DELETE events"
4393        );
4394    }
4395
4396    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
4397
4398    #[test]
4399    fn suppress_events_on_insert_emits_no_events() {
4400        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4401        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4402            .unwrap();
4403
4404        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4405            .unwrap();
4406
4407        let events = queue_payloads(&rt, "evts");
4408        assert!(
4409            events.is_empty(),
4410            "SUPPRESS EVENTS must prevent INSERT events"
4411        );
4412    }
4413
4414    #[test]
4415    fn suppress_events_on_update_emits_no_events() {
4416        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4417        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4418            .unwrap();
4419        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
4420            .unwrap();
4421        // drain the INSERT event
4422        let _ = queue_payloads(&rt, "evts");
4423        // Force pop to drain; simpler: just check new count after UPDATE
4424        rt.execute_query("QUEUE PURGE evts").unwrap();
4425
4426        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
4427            .unwrap();
4428
4429        let events = queue_payloads(&rt, "evts");
4430        assert!(
4431            events.is_empty(),
4432            "SUPPRESS EVENTS must prevent UPDATE events"
4433        );
4434    }
4435
4436    #[test]
4437    fn suppress_events_on_delete_emits_no_events() {
4438        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4439        rt.execute_query(
4440            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
4441        )
4442        .unwrap();
4443        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4444            .unwrap();
4445
4446        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
4447            .unwrap();
4448
4449        let events = queue_payloads(&rt, "evts");
4450        assert!(
4451            events.is_empty(),
4452            "SUPPRESS EVENTS must prevent DELETE events"
4453        );
4454    }
4455
4456    #[test]
4457    fn normal_insert_after_suppress_still_emits() {
4458        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
4459        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
4460            .unwrap();
4461
4462        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
4463            .unwrap();
4464        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
4465            .unwrap();
4466
4467        let events = queue_payloads(&rt, "evts");
4468        assert_eq!(
4469            events.len(),
4470            1,
4471            "only the non-suppressed INSERT should emit"
4472        );
4473        assert_eq!(
4474            events[0].get("id").and_then(crate::json::Value::as_u64),
4475            Some(2)
4476        );
4477    }
4478}