Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    metadata_from_json, AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput, CreateKvInput,
11    CreateNodeInput, CreateRowInput, CreateRowsBatchInput, CreateVectorInput, DeleteEntityInput,
12    PatchEntityOperation, PatchEntityOperationType, RowUpdateColumnRule, RowUpdateContractPlan,
13};
14use crate::application::ports::{
15    build_row_update_contract_plan, entity_row_fields_snapshot,
16    normalize_row_update_assignment_with_plan, normalize_row_update_value_for_rule,
17    RuntimeEntityPort,
18};
19use crate::application::ttl_payload::has_internal_ttl_metadata;
20use crate::presentation::entity_json::storage_value_to_json;
21use crate::storage::query::ast::{Expr, ReturningItem};
22use crate::storage::query::sql_lowering::{
23    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
24};
25use crate::storage::query::unified::{sys_key_red_entity_id, UnifiedRecord, UnifiedResult};
26use crate::storage::unified::MetadataValue;
27use crate::storage::Metadata;
28use std::collections::HashMap;
29use std::sync::Arc;
30
31use super::*;
32
33const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
34const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
35const TREE_METADATA_PREFIX: &str = "red.tree.";
36
37#[derive(Clone)]
38struct CompiledUpdateAssignment {
39    column: String,
40    expr: Expr,
41    metadata_key: Option<&'static str>,
42    row_rule: Option<RowUpdateColumnRule>,
43}
44
45struct CompiledUpdatePlan {
46    static_field_assignments: Vec<(String, Value)>,
47    static_metadata_assignments: Vec<(String, MetadataValue)>,
48    dynamic_assignments: Vec<CompiledUpdateAssignment>,
49    row_contract_plan: Option<RowUpdateContractPlan>,
50    row_modified_columns: Vec<String>,
51    row_touches_unique_columns: bool,
52}
53
54#[derive(Default)]
55struct MaterializedUpdateAssignments {
56    dynamic_field_assignments: Vec<(String, Value)>,
57    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
58}
59
60impl RedDBRuntime {
61    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
62    /// target table is tenant-scoped and the user's column list does
63    /// not already name the tenant column.
64    ///
65    /// Returns:
66    /// * `Ok(None)` — no injection needed (non-tenant table, or user
67    ///   supplied the column explicitly). Caller uses the original
68    ///   query unchanged.
69    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
70    ///   + literal value appended to every row.
71    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
72    ///   the current session. Fails loudly so callers don't produce
73    ///   rows that RLS would then hide on read.
74    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
75        let Some(tenant_col) = self.tenant_column(&query.table) else {
76            return Ok(None);
77        };
78        // User already named the column (literal match) — trust them.
79        if query
80            .columns
81            .iter()
82            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
83        {
84            return Ok(None);
85        }
86
87        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
88        // nested key like `headers.tenant` we operate on the root
89        // column (`headers`) and set / add the nested path inside its
90        // JSON value. If the user named the root column we mutate in
91        // place; otherwise we create a fresh JSON column for every row.
92        if let Some(dot_pos) = tenant_col.find('.') {
93            let (root, tail) = tenant_col.split_at(dot_pos);
94            let tail = &tail[1..]; // drop leading '.'
95            return self.inject_dotted_tenant(query, root, tail);
96        }
97
98        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
99            return Err(RedDBError::Query(format!(
100                "INSERT into tenant-scoped table '{}' requires an active tenant — \
101                 run SET TENANT '<id>' first or name column '{}' explicitly",
102                query.table, tenant_col
103            )));
104        };
105
106        let mut augmented = query.clone();
107        augmented.columns.push(tenant_col);
108        let lit = Value::text(tenant_id.clone());
109        for row in augmented.values.iter_mut() {
110            row.push(lit.clone());
111        }
112        for row in augmented.value_exprs.iter_mut() {
113            row.push(crate::storage::query::ast::Expr::Literal {
114                value: lit.clone(),
115                span: crate::storage::query::ast::Span::synthetic(),
116            });
117        }
118        Ok(Some(augmented))
119    }
120
121    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
122    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
123    /// nested JSON instead of appending a flat column.
124    ///
125    /// Cases:
126    /// * Root column already in the INSERT list → mutate per-row JSON
127    ///   (parse, set path, re-serialize).
128    /// * Root column absent → create a fresh `{tail: tenant}` JSON
129    ///   object and append the root column to the INSERT.
130    fn inject_dotted_tenant(
131        &self,
132        query: &InsertQuery,
133        root: &str,
134        tail: &str,
135    ) -> RedDBResult<Option<InsertQuery>> {
136        let active_tenant = crate::runtime::impl_core::current_tenant();
137        let mut augmented = query.clone();
138        let root_idx = augmented
139            .columns
140            .iter()
141            .position(|c| c.eq_ignore_ascii_case(root));
142
143        if let Some(idx) = root_idx {
144            // User supplied the root column. Per-row: if the dotted
145            // tail is already present we trust the user (admin / bulk
146            // loader scenario); otherwise fill from the active
147            // tenant. An unbound tenant is only an error when some
148            // row actually needs filling.
149            for row in augmented.values.iter_mut() {
150                let Some(slot) = row.get_mut(idx) else {
151                    continue;
152                };
153                if dotted_tail_already_set(slot, tail) {
154                    continue;
155                }
156                let Some(tenant_id) = &active_tenant else {
157                    return Err(RedDBError::Query(format!(
158                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
159                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
160                        query.table, root, tail
161                    )));
162                };
163                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
164            }
165            // Expression row is kept in sync by re-wrapping the
166            // mutated literal; the canonical path will re-evaluate
167            // against the same JSON shape.
168            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
169                if let Some(slot) = row.get_mut(idx) {
170                    let new_value = augmented
171                        .values
172                        .get(row_idx)
173                        .and_then(|v| v.get(idx))
174                        .cloned()
175                        .unwrap_or(Value::Null);
176                    *slot = crate::storage::query::ast::Expr::Literal {
177                        value: new_value,
178                        span: crate::storage::query::ast::Span::synthetic(),
179                    };
180                }
181            }
182        } else {
183            // No root column in the INSERT list — auto-fill needs a
184            // bound tenant to synthesise one. Error loud so we never
185            // create a tenant-less row that RLS would then hide.
186            let Some(tenant_id) = &active_tenant else {
187                return Err(RedDBError::Query(format!(
188                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
189                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
190                    query.table, root, tail
191                )));
192            };
193            // Create a fresh JSON column with only the tenant path set.
194            augmented.columns.push(root.to_string());
195            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
196            for row in augmented.values.iter_mut() {
197                row.push(fresh.clone());
198            }
199            for row in augmented.value_exprs.iter_mut() {
200                row.push(crate::storage::query::ast::Expr::Literal {
201                    value: fresh.clone(),
202                    span: crate::storage::query::ast::Span::synthetic(),
203                });
204            }
205        }
206
207        Ok(Some(augmented))
208    }
209
210    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
211    /// `lsns` is empty because events fire at commit time.
212    fn delete_entities_batch(
213        &self,
214        collection: &str,
215        ids: &[EntityId],
216    ) -> RedDBResult<(u64, Vec<u64>)> {
217        if ids.is_empty() {
218            return Ok((0, vec![]));
219        }
220
221        let store = self.db().store();
222        let Some(manager) = store.get_collection(collection) else {
223            return Ok((0, vec![]));
224        };
225
226        let active_xid = self.current_xid();
227        let conn_id = crate::runtime::impl_core::current_connection_id();
228        let mut autocommit_xid = None;
229        let mut tombstoned_ids = Vec::new();
230        let mut tombstoned_entities = Vec::new();
231        let mut physical_delete_ids = Vec::new();
232
233        for &id in ids {
234            let Some(mut entity) = manager.get(id) else {
235                continue;
236            };
237            if matches!(entity.data, EntityData::Row(_)) {
238                let previous_xmax = entity.xmax;
239                // Skip if this tuple was already tombstoned by a prior
240                // statement in the same txn — idempotent DELETE.
241                if entity.xmax != 0 {
242                    continue;
243                }
244
245                let xid = match active_xid {
246                    Some(xid) => xid,
247                    None => match autocommit_xid {
248                        Some(xid) => xid,
249                        None => {
250                            let mgr = self.snapshot_manager();
251                            let xid = mgr.begin();
252                            autocommit_xid = Some(xid);
253                            xid
254                        }
255                    },
256                };
257                entity.set_xmax(xid);
258                if manager.update(entity.clone()).is_ok() {
259                    if active_xid.is_some() {
260                        self.record_pending_tombstone(conn_id, collection, id, xid, previous_xmax);
261                    }
262                    tombstoned_entities.push(entity);
263                    tombstoned_ids.push(id);
264                }
265            } else {
266                physical_delete_ids.push(id);
267            }
268        }
269
270        if let Some(xid) = autocommit_xid {
271            self.snapshot_manager().commit(xid);
272        }
273
274        let mut affected = tombstoned_ids.len() as u64;
275        let mut lsns = Vec::with_capacity(tombstoned_ids.len() + physical_delete_ids.len());
276        if active_xid.is_some() {
277            store
278                .persist_entities_to_pager(collection, &tombstoned_entities)
279                .map_err(|err| RedDBError::Internal(err.to_string()))?;
280        } else {
281            store
282                .persist_entities_to_pager(collection, &tombstoned_entities)
283                .map_err(|err| RedDBError::Internal(err.to_string()))?;
284            for id in &tombstoned_ids {
285                store.context_index().remove_entity(*id);
286                let lsn = self.cdc_emit(
287                    crate::replication::cdc::ChangeOperation::Delete,
288                    collection,
289                    id.raw(),
290                    "entity",
291                );
292                lsns.push(lsn);
293            }
294        }
295
296        let deleted_ids = store
297            .delete_batch(collection, &physical_delete_ids)
298            .map_err(|err| RedDBError::Internal(err.to_string()))?;
299        affected += deleted_ids.len() as u64;
300        for id in &deleted_ids {
301            store.context_index().remove_entity(*id);
302            let lsn = self.cdc_emit(
303                crate::replication::cdc::ChangeOperation::Delete,
304                collection,
305                id.raw(),
306                "entity",
307            );
308            lsns.push(lsn);
309        }
310
311        Ok((affected, lsns))
312    }
313
314    /// Flushes context-index updates and CDC for each applied mutation.
315    /// Returns one LSN per entity in the same order as `applied`.
316    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<Vec<u64>> {
317        if applied.is_empty() {
318            return Ok(Vec::new());
319        }
320
321        let store = self.db().store();
322        if applied.iter().any(|item| item.context_index_dirty) {
323            store.context_index().index_entities(
324                &applied[0].collection,
325                applied
326                    .iter()
327                    .filter(|item| item.context_index_dirty)
328                    .map(|item| &item.entity),
329            );
330        }
331
332        for item in applied {
333            self.refresh_update_secondary_indexes(item)?;
334        }
335
336        let mut lsns = Vec::with_capacity(applied.len());
337        for item in applied {
338            let lsn = self.cdc_emit_prebuilt(
339                crate::replication::cdc::ChangeOperation::Update,
340                &item.collection,
341                &item.entity,
342                "entity",
343                item.metadata.as_ref(),
344                false,
345            );
346            lsns.push(lsn);
347        }
348        Ok(lsns)
349    }
350
351    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
352        self.persist_applied_entity_mutations(applied)
353    }
354
355    fn refresh_update_secondary_indexes(&self, applied: &AppliedEntityMutation) -> RedDBResult<()> {
356        if applied.pre_mutation_fields.is_empty() {
357            return Ok(());
358        }
359        let post = entity_row_fields_snapshot(&applied.entity);
360        if post.is_empty() {
361            return Ok(());
362        }
363
364        let indexed_cols = self
365            .index_store_ref()
366            .indexed_columns_set(&applied.collection);
367        if indexed_cols.is_empty() {
368            return Ok(());
369        }
370
371        if let Some(old_version) = applied.replaced_entity.as_ref() {
372            let old_index_fields: Vec<(String, crate::storage::schema::Value)> = applied
373                .pre_mutation_fields
374                .iter()
375                .filter(|(col, _)| indexed_cols.contains(col))
376                .cloned()
377                .collect();
378            let new_index_fields: Vec<(String, crate::storage::schema::Value)> = post
379                .iter()
380                .filter(|(col, _)| indexed_cols.contains(col))
381                .cloned()
382                .collect();
383            if !old_index_fields.is_empty() {
384                self.index_store_ref()
385                    .index_entity_delete(&applied.collection, old_version.id, &old_index_fields)
386                    .map_err(crate::RedDBError::Internal)?;
387            }
388            if !new_index_fields.is_empty() {
389                self.index_store_ref()
390                    .index_entity_insert(&applied.collection, applied.entity.id, &new_index_fields)
391                    .map_err(crate::RedDBError::Internal)?;
392            }
393            return Ok(());
394        }
395
396        let damage =
397            crate::application::entity::row_damage_vector(&applied.pre_mutation_fields, &post);
398        if damage
399            .touched_columns()
400            .into_iter()
401            .any(|col| indexed_cols.contains(col))
402        {
403            self.index_store_ref()
404                .index_entity_update(
405                    &applied.collection,
406                    applied.id,
407                    &applied.pre_mutation_fields,
408                    &post,
409                )
410                .map_err(crate::RedDBError::Internal)?;
411        }
412        Ok(())
413    }
414
415    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
416    ///
417    /// Each row in `query.values` is zipped with `query.columns` to produce a
418    /// set of named fields, which is then dispatched based on entity_type.
419    pub fn execute_insert(
420        &self,
421        raw_query: &str,
422        query: &InsertQuery,
423    ) -> RedDBResult<RuntimeQueryResult> {
424        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
425        // CollectionContract gate (#49): single entry point for the
426        // operator's collection-level write rules. Today this is a
427        // no-op for INSERT (APPEND ONLY permits insert); routing
428        // through the gate now means future contract bits — versioned,
429        // vault-only writes — plug in once instead of per verb.
430        crate::runtime::collection_contract::CollectionContractGate::check(
431            self,
432            &query.table,
433            crate::runtime::collection_contract::MutationKind::Insert,
434        )?;
435        // Phase 2.5.4 table-scoped tenancy: if the target table is
436        // tenant-scoped and the user didn't name the tenant column,
437        // auto-inject it with the thread-local `CURRENT_TENANT()`
438        // value. When the column is named explicitly we trust the
439        // caller (useful for admin tooling that writes on behalf of
440        // specific tenants). An unbound tenant on an implicit-fill
441        // path errors up front rather than producing a row the RLS
442        // policy would silently hide.
443        let augmented_owned;
444        let query = match self.maybe_inject_tenant_column(query)? {
445            Some(new_q) => {
446                augmented_owned = new_q;
447                &augmented_owned
448            }
449            None => query,
450        };
451        self.check_insert_column_policy(query)?;
452
453        let mut inserted_count: u64 = 0;
454        let effective_rows =
455            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
456
457        // Ensure the collection exists (auto-create on first insert).
458        let store = self.inner.db.store();
459        let _ = store.get_or_create_collection(&query.table);
460        let declared_model = self
461            .db()
462            .collection_contract_arc(&query.table)
463            .map(|contract| contract.declared_model);
464
465        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
466            if query.returning.is_some() {
467                Some(Vec::with_capacity(effective_rows.len()))
468            } else {
469                None
470            };
471        let mut returning_result: Option<UnifiedResult> = None;
472
473        if matches!(query.entity_type, InsertEntityType::Row)
474            && !matches!(
475                declared_model,
476                Some(crate::catalog::CollectionModel::TimeSeries)
477            )
478        {
479            let mut rows = Vec::with_capacity(effective_rows.len());
480            for row_values in &effective_rows {
481                if row_values.len() != query.columns.len() {
482                    return Err(RedDBError::Query(format!(
483                        "INSERT column count ({}) does not match value count ({})",
484                        query.columns.len(),
485                        row_values.len()
486                    )));
487                }
488                let (fields, mut metadata) =
489                    split_insert_metadata(self, &query.columns, row_values)?;
490                merge_with_clauses(
491                    &mut metadata,
492                    query.ttl_ms,
493                    query.expires_at_ms,
494                    &query.with_metadata,
495                );
496                if let Some(snaps) = returning_snapshots.as_mut() {
497                    snaps.push(fields.clone());
498                }
499                rows.push(CreateRowInput {
500                    collection: query.table.clone(),
501                    fields,
502                    metadata,
503                    node_links: Vec::new(),
504                    vector_links: Vec::new(),
505                });
506            }
507            let outputs = self.create_rows_batch(CreateRowsBatchInput {
508                collection: query.table.clone(),
509                rows,
510                suppress_events: query.suppress_events,
511            })?;
512            inserted_count = outputs.len() as u64;
513
514            // Hypertable chunk routing: if this table was declared via
515            // CREATE HYPERTABLE, register each row's time-column value
516            // with the registry so chunk metadata (bounds, row counts,
517            // TTL eligibility) stays current. This is what lets
518            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
519            // retention daemon sweep expired chunks without scanning
520            // every row.
521            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
522                let time_col = &spec.time_column;
523                // Find the column's index in the INSERT column list.
524                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
525                    for row in &effective_rows {
526                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
527                            if *n >= 0 {
528                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
529                            }
530                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
531                            let _ = self.inner.db.hypertables().route(&query.table, *n);
532                        }
533                    }
534                }
535            }
536
537            if let (Some(items), Some(snaps)) =
538                (query.returning.as_ref(), returning_snapshots.take())
539            {
540                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
541            }
542        } else {
543            // Issue #419: surface the inserted entity id on every INSERT path.
544            // For Node/Edge/Vector/Document/Kv we now keep each CreateEntityOutput
545            // so a RETURNING clause (and the unconditional inserted_ids list,
546            // below) can expose the engine-assigned id. TimeSeries (the row
547            // branch in this else) still returns the not-supported error
548            // because create_timeseries_point isn't plumbed through this fn.
549            let mut entity_outputs: Vec<crate::application::entity::CreateEntityOutput> =
550                Vec::with_capacity(effective_rows.len());
551            let mut returning_field_snaps: Vec<Vec<(String, Value)>> = if query.returning.is_some()
552            {
553                Vec::with_capacity(effective_rows.len())
554            } else {
555                Vec::new()
556            };
557            if matches!(
558                query.entity_type,
559                InsertEntityType::Node | InsertEntityType::Edge
560            ) {
561                enum PreparedGraphInsert {
562                    Node {
563                        fields: Vec<(String, Value)>,
564                        input: CreateNodeInput,
565                    },
566                    Edge {
567                        fields: Vec<(String, Value)>,
568                        input: CreateEdgeInput,
569                    },
570                }
571
572                let mut prepared = Vec::with_capacity(effective_rows.len());
573                for row_values in &effective_rows {
574                    if row_values.len() != query.columns.len() {
575                        return Err(RedDBError::Query(format!(
576                            "INSERT column count ({}) does not match value count ({})",
577                            query.columns.len(),
578                            row_values.len()
579                        )));
580                    }
581
582                    match query.entity_type {
583                        InsertEntityType::Node => {
584                            let (node_values, mut metadata) =
585                                split_insert_metadata(self, &query.columns, row_values)?;
586                            merge_with_clauses(
587                                &mut metadata,
588                                query.ttl_ms,
589                                query.expires_at_ms,
590                                &query.with_metadata,
591                            );
592                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
593                            apply_collection_default_ttl_metadata(
594                                self,
595                                &query.table,
596                                &mut metadata,
597                            );
598                            let (columns, values) = pairwise_columns_values(&node_values);
599                            let label = find_column_value_string(&columns, &values, "label")?;
600                            let node_type =
601                                find_column_value_opt_string(&columns, &values, "node_type");
602                            let properties = extract_remaining_properties(
603                                &columns,
604                                &values,
605                                &["label", "node_type"],
606                            );
607                            prepared.push(PreparedGraphInsert::Node {
608                                fields: node_values,
609                                input: CreateNodeInput {
610                                    collection: query.table.clone(),
611                                    label,
612                                    node_type,
613                                    properties,
614                                    metadata,
615                                    embeddings: Vec::new(),
616                                    table_links: Vec::new(),
617                                    node_links: Vec::new(),
618                                },
619                            });
620                        }
621                        InsertEntityType::Edge => {
622                            let (edge_values, mut metadata) =
623                                split_insert_metadata(self, &query.columns, row_values)?;
624                            merge_with_clauses(
625                                &mut metadata,
626                                query.ttl_ms,
627                                query.expires_at_ms,
628                                &query.with_metadata,
629                            );
630                            ensure_non_tree_reserved_metadata_entries(&metadata)?;
631                            apply_collection_default_ttl_metadata(
632                                self,
633                                &query.table,
634                                &mut metadata,
635                            );
636                            let (columns, values) = pairwise_columns_values(&edge_values);
637                            let label = find_column_value_string(&columns, &values, "label")?;
638                            ensure_non_tree_structural_edge_label(&label)?;
639                            let from_id = resolve_edge_endpoint(
640                                self.inner.db.store().as_ref(),
641                                &query.table,
642                                &columns,
643                                &values,
644                                "from",
645                            )?;
646                            let to_id = resolve_edge_endpoint(
647                                self.inner.db.store().as_ref(),
648                                &query.table,
649                                &columns,
650                                &values,
651                                "to",
652                            )?;
653                            let weight = find_column_value_f32_opt(&columns, &values, "weight");
654                            let properties = extract_remaining_properties(
655                                &columns,
656                                &values,
657                                &["label", "from", "to", "weight"],
658                            );
659                            prepared.push(PreparedGraphInsert::Edge {
660                                fields: edge_values,
661                                input: CreateEdgeInput {
662                                    collection: query.table.clone(),
663                                    label,
664                                    from: EntityId::new(from_id),
665                                    to: EntityId::new(to_id),
666                                    weight,
667                                    properties,
668                                    metadata,
669                                },
670                            });
671                        }
672                        _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
673                    }
674                }
675
676                ensure_graph_insert_contract(self, &query.table)?;
677                let mut batch = self.inner.db.batch();
678                for item in prepared {
679                    match item {
680                        PreparedGraphInsert::Node { fields, input } => {
681                            if query.returning.is_some() {
682                                returning_field_snaps.push(fields);
683                            }
684                            let node_type = input.node_type.unwrap_or_else(|| input.label.clone());
685                            batch = batch.add_node_with_type(
686                                input.collection,
687                                input.label,
688                                node_type,
689                                input.properties.into_iter().collect(),
690                                input.metadata.into_iter().collect(),
691                            );
692                        }
693                        PreparedGraphInsert::Edge { fields, input } => {
694                            if query.returning.is_some() {
695                                returning_field_snaps.push(fields);
696                            }
697                            batch = batch.add_edge(
698                                input.collection,
699                                input.label,
700                                input.from,
701                                input.to,
702                                input.weight.unwrap_or(1.0),
703                                input.properties.into_iter().collect(),
704                                input.metadata.into_iter().collect(),
705                            );
706                        }
707                    }
708                }
709                let batch_result = batch
710                    .execute()
711                    .map_err(|err| RedDBError::Internal(format!("{err:?}")))?;
712                let (ids, entity_kind) = match query.entity_type {
713                    InsertEntityType::Node => (batch_result.nodes, "graph_node"),
714                    InsertEntityType::Edge => (batch_result.edges, "graph_edge"),
715                    _ => unreachable!("prepared graph insert only handles NODE and EDGE"),
716                };
717                for id in &ids {
718                    self.stamp_xmin_if_in_txn(&query.table, *id);
719                }
720                self.cdc_emit_insert_batch_no_cache_invalidate(&query.table, &ids, entity_kind);
721                entity_outputs.extend(ids.iter().map(|id| {
722                    crate::application::entity::CreateEntityOutput {
723                        id: *id,
724                        entity: None,
725                    }
726                }));
727                inserted_count = ids.len() as u64;
728            } else {
729                for row_values in &effective_rows {
730                    if row_values.len() != query.columns.len() {
731                        return Err(RedDBError::Query(format!(
732                            "INSERT column count ({}) does not match value count ({})",
733                            query.columns.len(),
734                            row_values.len()
735                        )));
736                    }
737
738                    match query.entity_type {
739                        InsertEntityType::Row => {
740                            if query.returning.is_some() {
741                                return Err(RedDBError::Query(
742                                "RETURNING is not yet supported for this INSERT path (TimeSeries)"
743                                    .to_string(),
744                            ));
745                            }
746                            let (fields, mut metadata) =
747                                split_insert_metadata(self, &query.columns, row_values)?;
748                            merge_with_clauses(
749                                &mut metadata,
750                                query.ttl_ms,
751                                query.expires_at_ms,
752                                &query.with_metadata,
753                            );
754                            self.insert_timeseries_point(&query.table, fields, metadata)?;
755                        }
756                        InsertEntityType::Node | InsertEntityType::Edge => {
757                            unreachable!("NODE and EDGE are handled by the prepared graph path")
758                        }
759                        InsertEntityType::Vector => {
760                            let (vector_values, mut metadata) =
761                                split_insert_metadata(self, &query.columns, row_values)?;
762                            merge_with_clauses(
763                                &mut metadata,
764                                query.ttl_ms,
765                                query.expires_at_ms,
766                                &query.with_metadata,
767                            );
768                            let (columns, values) = pairwise_columns_values(&vector_values);
769                            let dense = find_column_value_vec_f32_any(
770                                &columns,
771                                &values,
772                                &["dense", "embedding"],
773                            )?;
774                            merge_vector_metadata_column(&mut metadata, &columns, &values)?;
775                            let content =
776                                find_column_value_opt_string(&columns, &values, "content");
777                            if query.returning.is_some() {
778                                returning_field_snaps.push(vector_values.clone());
779                            }
780                            let input = CreateVectorInput {
781                                collection: query.table.clone(),
782                                dense,
783                                content,
784                                metadata,
785                                link_row: None,
786                                link_node: None,
787                            };
788                            entity_outputs.push(self.create_vector(input)?);
789                        }
790                        InsertEntityType::Document => {
791                            let (document_values, mut metadata) =
792                                split_insert_metadata(self, &query.columns, row_values)?;
793                            merge_with_clauses(
794                                &mut metadata,
795                                query.ttl_ms,
796                                query.expires_at_ms,
797                                &query.with_metadata,
798                            );
799                            let (columns, values) = pairwise_columns_values(&document_values);
800                            let body_str = find_column_value_string(&columns, &values, "body")?;
801                            let body: crate::json::Value = crate::json::from_str(&body_str)
802                                .map_err(|e| {
803                                    RedDBError::Query(format!("invalid JSON body: {e}"))
804                                })?;
805                            if query.returning.is_some() {
806                                returning_field_snaps.push(document_values.clone());
807                            }
808                            let input = CreateDocumentInput {
809                                collection: query.table.clone(),
810                                body,
811                                metadata,
812                                node_links: Vec::new(),
813                                vector_links: Vec::new(),
814                            };
815                            entity_outputs.push(self.create_document(input)?);
816                        }
817                        InsertEntityType::Kv => {
818                            let (kv_values, mut metadata) =
819                                split_insert_metadata(self, &query.columns, row_values)?;
820                            merge_with_clauses(
821                                &mut metadata,
822                                query.ttl_ms,
823                                query.expires_at_ms,
824                                &query.with_metadata,
825                            );
826                            let (columns, values) = pairwise_columns_values(&kv_values);
827                            let key = find_column_value_string(&columns, &values, "key")?;
828                            let value = find_column_value(&columns, &values, "value")?;
829                            if query.returning.is_some() {
830                                returning_field_snaps.push(kv_values.clone());
831                            }
832                            let input = CreateKvInput {
833                                collection: query.table.clone(),
834                                key,
835                                value,
836                                metadata,
837                            };
838                            entity_outputs.push(self.create_kv(input)?);
839                        }
840                    }
841
842                    inserted_count += 1;
843                }
844            }
845
846            if let Some(items) = query.returning.as_ref() {
847                if !entity_outputs.is_empty() {
848                    returning_result = Some(build_returning_result(
849                        items,
850                        &returning_field_snaps,
851                        Some(&entity_outputs),
852                    ));
853                }
854            }
855        }
856
857        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
858        if let Some(ref embed_config) = query.auto_embed {
859            let store = self.inner.db.store();
860            let provider = crate::ai::parse_provider(&embed_config.provider)?;
861            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
862            let model = embed_config.model.clone().unwrap_or_else(|| {
863                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
864                    .ok()
865                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
866            });
867
868            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
869            let manager = store
870                .get_collection(&query.table)
871                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
872            let entities = manager.query_all(|_| true);
873            let recent: Vec<_> = entities
874                .into_iter()
875                .rev()
876                .take(effective_rows.len())
877                .collect();
878
879            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
880            let entity_combos: Vec<(usize, String)> = recent
881                .iter()
882                .enumerate()
883                .filter_map(|(i, entity)| {
884                    if let EntityData::Row(ref row) = entity.data {
885                        if let Some(ref named) = row.named {
886                            let texts: Vec<String> = embed_config
887                                .fields
888                                .iter()
889                                .filter_map(|field| match named.get(field) {
890                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
891                                    _ => None,
892                                })
893                                .collect();
894                            if !texts.is_empty() {
895                                return Some((i, texts.join(" ")));
896                            }
897                        }
898                    }
899                    None
900                })
901                .collect();
902
903            if !entity_combos.is_empty() {
904                // Batch phase: single provider round-trip for all rows.
905                let batch_texts: Vec<String> =
906                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
907
908                let batch_client =
909                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
910
911                let embeddings = match tokio::runtime::Handle::try_current() {
912                    Ok(handle) => tokio::task::block_in_place(|| {
913                        handle.block_on(batch_client.embed_batch(
914                            &provider,
915                            &model,
916                            &api_key,
917                            batch_texts,
918                        ))
919                    }),
920                    Err(_) => {
921                        return Err(RedDBError::Query(
922                            "AUTO EMBED requires a Tokio runtime context".to_string(),
923                        ));
924                    }
925                }
926                .map_err(|e| RedDBError::Query(e.to_string()))?;
927
928                // Distribute phase: persist one vector per non-empty embedding.
929                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
930                    if dense.is_empty() {
931                        continue;
932                    }
933                    self.create_vector(CreateVectorInput {
934                        collection: query.table.clone(),
935                        dense,
936                        content: Some(combined.clone()),
937                        metadata: Vec::new(),
938                        link_row: None,
939                        link_node: None,
940                    })?;
941                }
942            }
943        }
944
945        if inserted_count > 0 {
946            self.note_table_write(&query.table);
947        }
948
949        let mut result = RuntimeQueryResult::dml_result(
950            raw_query.to_string(),
951            inserted_count,
952            "insert",
953            "runtime-dml",
954        );
955        if let Some(returning) = returning_result {
956            result.result = returning;
957        }
958        Ok(result)
959    }
960
961    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
962        let Some(auth_store) = self.inner.auth_store.read().clone() else {
963            return Ok(());
964        };
965        if !auth_store.iam_authorization_enabled() {
966            return Ok(());
967        }
968        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
969            return Ok(());
970        };
971
972        let tenant = crate::runtime::impl_core::current_tenant();
973        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
974        let request = crate::auth::ColumnAccessRequest {
975            action: "insert".to_string(),
976            schema: None,
977            table: query.table.clone(),
978            columns: query.columns.clone(),
979        };
980        let ctx = crate::auth::policies::EvalContext {
981            principal_tenant: tenant.clone(),
982            current_tenant: tenant,
983            peer_ip: None,
984            mfa_present: false,
985            now_ms: crate::auth::now_ms(),
986            principal_is_admin_role: role == crate::auth::Role::Admin,
987        };
988
989        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
990        let table_allowed = matches!(
991            outcome.table_decision,
992            crate::auth::policies::Decision::Allow { .. }
993                | crate::auth::policies::Decision::AdminBypass
994        );
995        if !table_allowed {
996            return Err(RedDBError::Query(format!(
997                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
998                outcome.table_resource.kind, outcome.table_resource.name
999            )));
1000        }
1001        if let Some(denied) = outcome.first_denied_column() {
1002            return Err(RedDBError::Query(format!(
1003                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
1004                denied.resource.kind, denied.resource.name
1005            )));
1006        }
1007
1008        Ok(())
1009    }
1010
1011    pub(crate) fn insert_timeseries_point(
1012        &self,
1013        collection: &str,
1014        fields: Vec<(String, Value)>,
1015        mut metadata: Vec<(String, MetadataValue)>,
1016    ) -> RedDBResult<EntityId> {
1017        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
1018
1019        let (columns, values) = pairwise_columns_values(&fields);
1020        validate_timeseries_insert_columns(&columns)?;
1021
1022        let metric = find_column_value_string(&columns, &values, "metric")?;
1023        let value = find_column_value_f64(&columns, &values, "value")?;
1024        let timestamp_ns =
1025            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
1026        let tags = find_timeseries_tags(&columns, &values)?;
1027
1028        let mut entity = UnifiedEntity::new(
1029            EntityId::new(0),
1030            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
1031                series: collection.to_string(),
1032                metric: metric.clone(),
1033            })),
1034            EntityData::TimeSeries(crate::storage::TimeSeriesData {
1035                metric,
1036                timestamp_ns,
1037                value,
1038                tags,
1039            }),
1040        );
1041        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
1042        // or an autocommit xid (allocated and committed up-front so
1043        // future snapshots see the row as soon as it lands).
1044        let writer_xid = match self.current_xid() {
1045            Some(xid) => xid,
1046            None => {
1047                let mgr = self.snapshot_manager();
1048                let xid = mgr.begin();
1049                mgr.commit(xid);
1050                xid
1051            }
1052        };
1053        entity.set_xmin(writer_xid);
1054
1055        let store = self.inner.db.store();
1056        let id = store
1057            .insert_auto(collection, entity)
1058            .map_err(|err| RedDBError::Internal(err.to_string()))?;
1059
1060        if !metadata.is_empty() {
1061            let _ = store.set_metadata(
1062                collection,
1063                id,
1064                Metadata::with_fields(metadata.into_iter().collect()),
1065            );
1066        }
1067
1068        self.cdc_emit(
1069            crate::replication::cdc::ChangeOperation::Insert,
1070            collection,
1071            id.raw(),
1072            "timeseries",
1073        );
1074
1075        Ok(id)
1076    }
1077
1078    /// Execute UPDATE table SET col=val, ... WHERE filter
1079    ///
1080    /// Scans the target collection, evaluates the WHERE filter against each
1081    /// record, and patches every matching entity.
1082    pub fn execute_update(
1083        &self,
1084        raw_query: &str,
1085        query: &UpdateQuery,
1086    ) -> RedDBResult<RuntimeQueryResult> {
1087        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1088        // CollectionContract gate (#50): runs the APPEND ONLY guard
1089        // (and any future contract bits) before RLS / RETURNING work
1090        // so the operator's immutability declaration is honoured
1091        // uniformly and the error message points at the DDL rather
1092        // than at a downstream symptom.
1093        crate::runtime::collection_contract::CollectionContractGate::check(
1094            self,
1095            &query.table,
1096            crate::runtime::collection_contract::MutationKind::Update,
1097        )?;
1098
1099        // Apply RLS augmentation first so every downstream path — plain
1100        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
1101        // same policy-filtered target set. This prevents RETURNING
1102        // from ever exposing rows the UPDATE policy would have
1103        // denied.
1104        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
1105        let augmented_query: UpdateQuery;
1106        let effective_query: &UpdateQuery = if rls_gated {
1107            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1108                self,
1109                &query.table,
1110                crate::storage::query::ast::PolicyAction::Update,
1111            );
1112            let Some(policy) = rls_filter else {
1113                // No admitting policy: zero rows affected, empty
1114                // RETURNING (never leak rows the caller can't touch).
1115                let mut response = RuntimeQueryResult::dml_result(
1116                    raw_query.to_string(),
1117                    0,
1118                    "update",
1119                    "runtime-dml-rls",
1120                );
1121                if let Some(items) = query.returning.clone() {
1122                    response.result = build_returning_result(&items, &[], None);
1123                }
1124                return Ok(response);
1125            };
1126            let mut augmented = query.clone();
1127            augmented.filter = Some(match augmented.filter.take() {
1128                Some(existing) => {
1129                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1130                }
1131                None => policy,
1132            });
1133            augmented_query = augmented;
1134            &augmented_query
1135        } else {
1136            query
1137        };
1138
1139        // RETURNING wraps the inner executor and uses the touched-id
1140        // list the inner reports so the post-image reflects exactly
1141        // the rows the UPDATE actually mutated (not whatever a
1142        // separate SELECT might have observed).
1143        if let Some(items) = effective_query.returning.clone() {
1144            let mut inner_query = effective_query.clone();
1145            inner_query.returning = None;
1146            let (mut response, touched_ids) =
1147                self.execute_update_inner_tracked(raw_query, &inner_query)?;
1148
1149            let snapshots = super::dml_target_scan::DmlTargetScan::new(
1150                self,
1151                &effective_query.table,
1152                None,
1153                None,
1154            )
1155            .row_snapshots(&touched_ids);
1156
1157            response.result = build_returning_result(&items, &snapshots, None);
1158            response.engine = "runtime-dml-returning";
1159            return Ok(response);
1160        }
1161
1162        self.execute_update_inner(raw_query, effective_query)
1163    }
1164
1165    /// Back-compat shim: the older entry point ignored touched ids.
1166    fn execute_update_inner(
1167        &self,
1168        raw_query: &str,
1169        query: &UpdateQuery,
1170    ) -> RedDBResult<RuntimeQueryResult> {
1171        self.execute_update_inner_tracked(raw_query, query)
1172            .map(|(res, _)| res)
1173    }
1174
1175    fn execute_update_inner_tracked(
1176        &self,
1177        raw_query: &str,
1178        query: &UpdateQuery,
1179    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
1180        let store = self.inner.db.store();
1181        let effective_filter = effective_update_filter(query);
1182        let compiled_plan = self.compile_update_plan(query)?;
1183        let mut touched_ids: Vec<EntityId> = Vec::new();
1184        let limit_cap = query.limit.map(|l| l as usize);
1185        let manager = store
1186            .get_collection(&query.table)
1187            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
1188        let ids_to_update = super::dml_target_scan::DmlTargetScan::new(
1189            self,
1190            &query.table,
1191            effective_filter.as_ref(),
1192            limit_cap,
1193        )
1194        .find_target_ids()?;
1195
1196        let mut affected: u64 = 0;
1197        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1198            let mut applied_chunk = Vec::with_capacity(chunk.len());
1199            for entity in manager.get_many(chunk).into_iter().flatten() {
1200                let assignments =
1201                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
1202                let applied = self.apply_materialized_update_for_entity(
1203                    query.table.clone(),
1204                    entity,
1205                    &compiled_plan,
1206                    assignments,
1207                )?;
1208                touched_ids.push(applied.id);
1209                applied_chunk.push(applied);
1210            }
1211            self.persist_update_chunk(&applied_chunk)?;
1212            affected += applied_chunk.len() as u64;
1213            let lsns = self.flush_update_chunk(&applied_chunk)?;
1214            if !query.suppress_events {
1215                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
1216            }
1217        }
1218
1219        if affected > 0 {
1220            self.note_table_write(&query.table);
1221        }
1222
1223        Ok((
1224            RuntimeQueryResult::dml_result(
1225                raw_query.to_string(),
1226                affected,
1227                "update",
1228                "runtime-dml",
1229            ),
1230            touched_ids,
1231        ))
1232    }
1233
1234    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
1235        let mut static_field_assignments = Vec::new();
1236        let mut static_metadata_assignments = Vec::new();
1237        let mut dynamic_assignments = Vec::new();
1238        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
1239        let mut row_modified_columns = Vec::new();
1240
1241        for (column, expr) in &query.assignment_exprs {
1242            let metadata_key = resolve_sql_ttl_metadata_key(column);
1243            if let Ok(value) = fold_expr_to_value(expr.clone()) {
1244                if let Some(metadata_key) = metadata_key {
1245                    let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1246                    let (canonical_key, canonical_value) =
1247                        canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1248                    static_metadata_assignments.push((canonical_key.to_string(), canonical_value));
1249                } else {
1250                    let value = self.resolve_crypto_sentinel(value)?;
1251                    static_field_assignments.push((
1252                        column.clone(),
1253                        normalize_row_update_assignment_with_plan(
1254                            &query.table,
1255                            column,
1256                            value,
1257                            row_contract_plan.as_ref(),
1258                        )?,
1259                    ));
1260                    row_modified_columns.push(column.clone());
1261                }
1262                continue;
1263            }
1264
1265            dynamic_assignments.push(CompiledUpdateAssignment {
1266                column: column.clone(),
1267                expr: expr.clone(),
1268                metadata_key,
1269                row_rule: if metadata_key.is_none() {
1270                    if let Some(plan) = row_contract_plan.as_ref() {
1271                        if plan.timestamps_enabled
1272                            && (column == "created_at" || column == "updated_at")
1273                        {
1274                            return Err(RedDBError::Query(format!(
1275                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1276                                query.table, column
1277                            )));
1278                        }
1279                        if let Some(rule) = plan.declared_rules.get(column) {
1280                            Some(rule.clone())
1281                        } else if plan.strict_schema {
1282                            return Err(RedDBError::Query(format!(
1283                                "collection '{}' is strict and does not allow undeclared fields: {}",
1284                                query.table, column
1285                            )));
1286                        } else {
1287                            None
1288                        }
1289                    } else {
1290                        None
1291                    }
1292                } else {
1293                    None
1294                },
1295            });
1296            if metadata_key.is_none() {
1297                row_modified_columns.push(column.clone());
1298            }
1299        }
1300
1301        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1302        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1303            row_modified_columns.iter().any(|column| {
1304                plan.unique_columns
1305                    .keys()
1306                    .any(|unique| unique.eq_ignore_ascii_case(column))
1307            })
1308        });
1309
1310        if let Some(ttl_ms) = query.ttl_ms {
1311            static_metadata_assignments
1312                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1313        }
1314        if let Some(expires_at_ms) = query.expires_at_ms {
1315            static_metadata_assignments.push((
1316                "_expires_at".to_string(),
1317                metadata_u64_to_value(expires_at_ms),
1318            ));
1319        }
1320        for (key, val) in &query.with_metadata {
1321            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1322        }
1323
1324        Ok(CompiledUpdatePlan {
1325            static_field_assignments,
1326            static_metadata_assignments,
1327            dynamic_assignments,
1328            row_contract_plan,
1329            row_modified_columns,
1330            row_touches_unique_columns,
1331        })
1332    }
1333
1334    fn materialize_update_assignments_for_entity(
1335        &self,
1336        query: &UpdateQuery,
1337        entity: &UnifiedEntity,
1338        compiled_plan: &CompiledUpdatePlan,
1339    ) -> RedDBResult<MaterializedUpdateAssignments> {
1340        let mut assignments = MaterializedUpdateAssignments::default();
1341        let mut record: Option<UnifiedRecord> = None;
1342
1343        for assignment in &compiled_plan.dynamic_assignments {
1344            if record.is_none() {
1345                record = runtime_any_record_from_entity_ref(entity);
1346            }
1347            let Some(record) = record.as_ref() else {
1348                return Err(RedDBError::Query(format!(
1349                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1350                    entity.id.raw(),
1351                    query.table
1352                )));
1353            };
1354            let value = super::expr_eval::evaluate_runtime_expr_with_db(
1355                Some(self.inner.db.as_ref()),
1356                &assignment.expr,
1357                record,
1358                Some(query.table.as_str()),
1359                Some(query.table.as_str()),
1360            )
1361            .ok_or_else(|| {
1362                RedDBError::Query(format!(
1363                    "failed to evaluate UPDATE expression for column '{}'",
1364                    assignment.column
1365                ))
1366            })?;
1367
1368            if let Some(metadata_key) = assignment.metadata_key {
1369                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1370                let (canonical_key, canonical_value) =
1371                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1372                assignments
1373                    .dynamic_metadata_assignments
1374                    .push((canonical_key.to_string(), canonical_value));
1375            } else {
1376                assignments.dynamic_field_assignments.push((
1377                    assignment.column.clone(),
1378                    normalize_row_update_value_for_rule(
1379                        &query.table,
1380                        self.resolve_crypto_sentinel(value)?,
1381                        assignment.row_rule.as_ref(),
1382                    )?,
1383                ));
1384            }
1385        }
1386
1387        Ok(assignments)
1388    }
1389
1390    fn apply_materialized_update_for_entity(
1391        &self,
1392        collection: String,
1393        entity: UnifiedEntity,
1394        compiled_plan: &CompiledUpdatePlan,
1395        assignments: MaterializedUpdateAssignments,
1396    ) -> RedDBResult<AppliedEntityMutation> {
1397        if matches!(entity.data, EntityData::Row(_)) {
1398            return self.apply_loaded_sql_update_row_core(
1399                collection,
1400                entity,
1401                &compiled_plan.static_field_assignments,
1402                assignments.dynamic_field_assignments,
1403                &compiled_plan.static_metadata_assignments,
1404                assignments.dynamic_metadata_assignments,
1405                compiled_plan.row_contract_plan.as_ref(),
1406                &compiled_plan.row_modified_columns,
1407                compiled_plan.row_touches_unique_columns,
1408            );
1409        }
1410
1411        self.apply_loaded_patch_entity_core(
1412            collection,
1413            entity,
1414            crate::json::Value::Null,
1415            build_patch_operations_from_materialized_assignments(compiled_plan, assignments),
1416        )
1417    }
1418
1419    /// Execute DELETE FROM table WHERE filter
1420    pub fn execute_delete(
1421        &self,
1422        raw_query: &str,
1423        query: &DeleteQuery,
1424    ) -> RedDBResult<RuntimeQueryResult> {
1425        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1426        // CollectionContract gate (#50) — see execute_update for
1427        // rationale. The gate handles APPEND ONLY rejection and is
1428        // the single point where future contract bits land.
1429        crate::runtime::collection_contract::CollectionContractGate::check(
1430            self,
1431            &query.table,
1432            crate::runtime::collection_contract::MutationKind::Delete,
1433        )?;
1434
1435        // RETURNING on DELETE: capture the pre-image via an internal
1436        // SELECT that reuses the same WHERE, then run the delete with
1437        // the RETURNING clause stripped, then project the captured
1438        // rows through the requested items. The extra SELECT is a
1439        // pragmatic MVP — a future pass can fuse the scan with the
1440        // delete to avoid the second pass over the heap.
1441        if let Some(items) = query.returning.clone() {
1442            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1443                RedDBError::Query(
1444                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1445                )
1446            })?;
1447            let captured = self.execute_query(&select_sql)?;
1448
1449            let mut inner_query = query.clone();
1450            inner_query.returning = None;
1451            let _ = self.execute_delete(raw_query, &inner_query)?;
1452
1453            let snapshots: Vec<Vec<(String, Value)>> = captured
1454                .result
1455                .records
1456                .iter()
1457                .map(|rec| {
1458                    rec.iter_fields()
1459                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1460                        .collect()
1461                })
1462                .collect();
1463            let affected = snapshots.len() as u64;
1464            let result = build_returning_result(&items, &snapshots, None);
1465
1466            let mut response = RuntimeQueryResult::dml_result(
1467                raw_query.to_string(),
1468                affected,
1469                "delete",
1470                "runtime-dml-returning",
1471            );
1472            response.result = result;
1473            return Ok(response);
1474        }
1475        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
1476        //
1477        // When the table has RLS enabled, gate the DELETE by the
1478        // per-role policy set: mutations only touch rows that *every*
1479        // matching `FOR DELETE` policy would accept. No policies =>
1480        // zero rows affected (PG restrictive-default).
1481        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1482            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1483                self,
1484                &query.table,
1485                crate::storage::query::ast::PolicyAction::Delete,
1486            );
1487            let Some(policy) = rls_filter else {
1488                return Ok(RuntimeQueryResult::dml_result(
1489                    raw_query.to_string(),
1490                    0,
1491                    "delete",
1492                    "runtime-dml-rls",
1493                ));
1494            };
1495            // Fold the policy predicate into the user's WHERE before
1496            // dispatching — the remainder of this function reads the
1497            // filter from `query` via `effective_delete_filter`, which
1498            // respects the updated value.
1499            let mut augmented = query.clone();
1500            augmented.filter = Some(match augmented.filter.take() {
1501                Some(existing) => {
1502                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1503                }
1504                None => policy,
1505            });
1506            return self.execute_delete_inner(raw_query, &augmented);
1507        }
1508        self.execute_delete_inner(raw_query, query)
1509    }
1510
1511    fn execute_delete_inner(
1512        &self,
1513        raw_query: &str,
1514        query: &DeleteQuery,
1515    ) -> RedDBResult<RuntimeQueryResult> {
1516        let effective_filter = effective_delete_filter(query);
1517
1518        // Find the rows that match the WHERE clause. The "find target
1519        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
1520        // the same scan strategy.
1521        let scan = super::dml_target_scan::DmlTargetScan::new(
1522            self,
1523            &query.table,
1524            effective_filter.as_ref(),
1525            None,
1526        );
1527        let ids_to_delete = scan.find_target_ids()?;
1528
1529        // For event-enabled collections, snapshot the pre-delete state
1530        // before rows are physically removed.
1531        let needs_delete_events =
1532            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1533        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1534            scan.row_json_pre_images(&ids_to_delete)
1535        } else {
1536            HashMap::new()
1537        };
1538
1539        let mut affected: u64 = 0;
1540        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1541            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1542            affected += count;
1543            if needs_delete_events && !lsns.is_empty() {
1544                // lsns.len() == actually-deleted entities; align with chunk ids.
1545                // `delete_batch` may skip missing entities, so we correlate by
1546                // the number returned (they're emitted in chunk order).
1547                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1548                self.emit_delete_events_for_collection(
1549                    &query.table,
1550                    deleted_chunk,
1551                    &lsns,
1552                    &pre_images,
1553                )?;
1554            }
1555        }
1556        pre_images.clear();
1557
1558        if affected > 0 {
1559            self.note_table_write(&query.table);
1560        }
1561
1562        Ok(RuntimeQueryResult::dml_result(
1563            raw_query.to_string(),
1564            affected,
1565            "delete",
1566            "runtime-dml",
1567        ))
1568    }
1569}
1570
1571fn build_patch_operations_from_materialized_assignments(
1572    compiled_plan: &CompiledUpdatePlan,
1573    assignments: MaterializedUpdateAssignments,
1574) -> Vec<PatchEntityOperation> {
1575    let mut operations = Vec::with_capacity(
1576        compiled_plan.static_field_assignments.len()
1577            + compiled_plan.static_metadata_assignments.len()
1578            + assignments.dynamic_field_assignments.len()
1579            + assignments.dynamic_metadata_assignments.len(),
1580    );
1581
1582    for (column, value) in &compiled_plan.static_field_assignments {
1583        operations.push(PatchEntityOperation {
1584            op: PatchEntityOperationType::Set,
1585            path: vec!["fields".to_string(), column.clone()],
1586            value: Some(storage_value_to_json(value)),
1587        });
1588    }
1589
1590    for (column, value) in assignments.dynamic_field_assignments {
1591        operations.push(PatchEntityOperation {
1592            op: PatchEntityOperationType::Set,
1593            path: vec!["fields".to_string(), column],
1594            value: Some(storage_value_to_json(&value)),
1595        });
1596    }
1597
1598    for (key, value) in &compiled_plan.static_metadata_assignments {
1599        operations.push(PatchEntityOperation {
1600            op: PatchEntityOperationType::Set,
1601            path: vec!["metadata".to_string(), key.clone()],
1602            value: Some(metadata_value_to_json(value)),
1603        });
1604    }
1605
1606    for (key, value) in assignments.dynamic_metadata_assignments {
1607        operations.push(PatchEntityOperation {
1608            op: PatchEntityOperationType::Set,
1609            path: vec!["metadata".to_string(), key],
1610            value: Some(metadata_value_to_json(&value)),
1611        });
1612    }
1613
1614    operations
1615}
1616
1617/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
1618/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
1619/// capture the pre-image before actually removing the rows. Returns
1620/// `None` when the input does not start with `DELETE`.
1621///
1622/// Case-insensitive on the keywords. Preserves everything between
1623/// the table name and the RETURNING clause, so WHERE / ORDER BY /
1624/// LIMIT survive untouched. The RETURNING tail — if present — is
1625/// truncated at the first top-level `RETURNING` token.
1626fn delete_to_select_sql(sql: &str) -> Option<String> {
1627    let trimmed = sql.trim_start();
1628    let lowered = trimmed.to_ascii_lowercase();
1629    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1630        return None;
1631    }
1632    // Find `FROM` after DELETE.
1633    let from_idx = lowered.find(" from ")?;
1634    let after_from = &trimmed[from_idx + " from ".len()..];
1635    let after_from_lc = &lowered[from_idx + " from ".len()..];
1636
1637    // Cut off the RETURNING tail (a naive search — the RETURNING
1638    // clause only appears once per statement at top level in our
1639    // grammar). Matches whitespace-bounded tokens to avoid clipping
1640    // `RETURNING` inside a string literal.
1641    let mut body = after_from.to_string();
1642    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1643        body.truncate(pos);
1644    }
1645    Some(format!("SELECT * FROM {}", body.trim_end()))
1646}
1647
1648/// Find the byte offset of a whitespace-bounded keyword in a
1649/// lowercased haystack, skipping matches inside single-quoted
1650/// string literals. Naive — no escape handling — but enough for
1651/// the shapes the DML parser emits.
1652fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1653    let bytes = haystack.as_bytes();
1654    let nlen = needle.len();
1655    let mut i = 0usize;
1656    let mut in_string = false;
1657    while i < bytes.len() {
1658        let c = bytes[i];
1659        if c == b'\'' {
1660            in_string = !in_string;
1661            i += 1;
1662            continue;
1663        }
1664        if !in_string
1665            && i + nlen <= bytes.len()
1666            && &bytes[i..i + nlen] == needle.as_bytes()
1667            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1668            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1669        {
1670            return Some(i);
1671        }
1672        i += 1;
1673    }
1674    None
1675}
1676
1677/// Build a `UnifiedResult` from the rows affected by a DML statement plus
1678/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
1679/// for one affected row; `outputs`, when provided, supplies the engine-
1680/// assigned entity id for the same row (INSERT path). Projection honours
1681/// the RETURNING items: `*` expands to every snapshot column plus
1682/// `red_entity_id` when available.
1683fn build_returning_result(
1684    items: &[ReturningItem],
1685    snapshots: &[Vec<(String, Value)>],
1686    outputs: Option<&[crate::application::entity::CreateEntityOutput]>,
1687) -> UnifiedResult {
1688    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1689
1690    let mut columns: Vec<String> = if project_all {
1691        let mut cols: Vec<String> = Vec::new();
1692        if outputs.is_some() {
1693            cols.push("red_entity_id".to_string());
1694        }
1695        if let Some(first) = snapshots.first() {
1696            for (name, _) in first {
1697                cols.push(name.clone());
1698            }
1699        }
1700        cols
1701    } else {
1702        items
1703            .iter()
1704            .filter_map(|it| match it {
1705                ReturningItem::Column(c) => Some(c.clone()),
1706                ReturningItem::All => None,
1707            })
1708            .collect()
1709    };
1710    // Guarantee unique order-preserving column list.
1711    {
1712        let mut seen = std::collections::HashSet::new();
1713        columns.retain(|c| seen.insert(c.clone()));
1714    }
1715
1716    let id_key = sys_key_red_entity_id();
1717    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1718    for (idx, snap) in snapshots.iter().enumerate() {
1719        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1720        if let Some(outs) = outputs {
1721            if let Some(out) = outs.get(idx) {
1722                values.insert(Arc::clone(&id_key), Value::Integer(out.id.raw() as i64));
1723            }
1724        }
1725        for (name, val) in snap {
1726            values.insert(Arc::from(name.as_str()), val.clone());
1727        }
1728        let mut rec = UnifiedRecord::default();
1729        // Only keep projected columns on the record.
1730        for col in &columns {
1731            if let Some(v) = values.get(col.as_str()) {
1732                rec.set_arc(Arc::from(col.as_str()), v.clone());
1733            }
1734        }
1735        records.push(rec);
1736    }
1737
1738    UnifiedResult {
1739        columns,
1740        records,
1741        stats: Default::default(),
1742        pre_serialized_json: None,
1743    }
1744}
1745
1746fn ensure_graph_insert_contract(runtime: &RedDBRuntime, collection: &str) -> RedDBResult<()> {
1747    let db = runtime.db();
1748    if let Some(contract) = db.collection_contract(collection) {
1749        let advisory_implicit_dynamic = matches!(
1750            (&contract.origin, &contract.schema_mode),
1751            (
1752                crate::physical::ContractOrigin::Implicit,
1753                crate::catalog::SchemaMode::Dynamic,
1754            )
1755        );
1756        if advisory_implicit_dynamic
1757            || matches!(
1758                contract.declared_model,
1759                crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed
1760            )
1761        {
1762            return Ok(());
1763        }
1764        return Err(RedDBError::InvalidOperation(format!(
1765            "collection '{}' is declared as '{:?}' and does not allow 'Graph' writes",
1766            collection, contract.declared_model
1767        )));
1768    }
1769
1770    let now = std::time::SystemTime::now()
1771        .duration_since(std::time::UNIX_EPOCH)
1772        .unwrap_or_default()
1773        .as_millis();
1774    db.save_collection_contract(crate::physical::CollectionContract {
1775        name: collection.to_string(),
1776        declared_model: crate::catalog::CollectionModel::Graph,
1777        schema_mode: crate::catalog::SchemaMode::Dynamic,
1778        origin: crate::physical::ContractOrigin::Implicit,
1779        version: 1,
1780        created_at_unix_ms: now,
1781        updated_at_unix_ms: now,
1782        default_ttl_ms: db.collection_default_ttl_ms(collection),
1783        vector_dimension: None,
1784        vector_metric: None,
1785        context_index_fields: Vec::new(),
1786        declared_columns: Vec::new(),
1787        table_def: None,
1788        timestamps_enabled: false,
1789        context_index_enabled: false,
1790        append_only: false,
1791        subscriptions: Vec::new(),
1792    })
1793    .map(|_| ())
1794    .map_err(|err| RedDBError::Internal(err.to_string()))
1795}
1796
1797fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
1798    if columns.is_empty() {
1799        return columns;
1800    }
1801
1802    let mut unique = Vec::with_capacity(columns.len());
1803    for column in columns.drain(..) {
1804        if !unique
1805            .iter()
1806            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1807        {
1808            unique.push(column);
1809        }
1810    }
1811    unique
1812}
1813
1814// =============================================================================
1815// Helper functions for extracting typed values from column/value pairs
1816// =============================================================================
1817
1818const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
1819
1820fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
1821    if column.eq_ignore_ascii_case("_ttl") {
1822        Some(SQL_TTL_METADATA_COLUMNS[0])
1823    } else if column.eq_ignore_ascii_case("_ttl_ms") {
1824        Some(SQL_TTL_METADATA_COLUMNS[1])
1825    } else if column.eq_ignore_ascii_case("_expires_at") {
1826        Some(SQL_TTL_METADATA_COLUMNS[2])
1827    } else {
1828        None
1829    }
1830}
1831
1832/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
1833/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
1834/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
1835/// `_ttl_ms` and `_expires_at` are passed through.
1836fn canonicalize_sql_ttl_metadata(
1837    key: &'static str,
1838    value: MetadataValue,
1839) -> (&'static str, MetadataValue) {
1840    if key != "_ttl" {
1841        return (key, value);
1842    }
1843    let scaled = match value {
1844        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
1845        MetadataValue::Timestamp(ms_or_s) => {
1846            // Timestamp is already chosen for very large values; treat as
1847            // already-ms to avoid silent overflow.
1848            MetadataValue::Timestamp(ms_or_s)
1849        }
1850        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
1851        other => other,
1852    };
1853    ("_ttl_ms", scaled)
1854}
1855
1856/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
1857/// `SECRET('...')` literals. The runtime strips this marker and
1858/// applies the actual crypto transform during INSERT execution.
1859pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
1860
1861impl RedDBRuntime {
1862    /// Strip the plaintext sentinel from a `Value::Password` or
1863    /// `Value::Secret` produced by the parser and apply the real
1864    /// crypto transform. `Password` is always hashed with argon2id.
1865    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
1866    /// when `red.config.secret.auto_encrypt = true` (default).
1867    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
1868        match value {
1869            Value::Password(marked) => {
1870                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
1871                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
1872                } else {
1873                    Ok(Value::Password(marked))
1874                }
1875            }
1876            Value::Secret(bytes) => {
1877                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
1878                    if !self.secret_auto_encrypt() {
1879                        return Err(RedDBError::Query(
1880                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
1881                             is false. Insert pre-encrypted bytes directly instead."
1882                                .to_string(),
1883                        ));
1884                    }
1885                    let key = self.secret_aes_key().ok_or_else(|| {
1886                        RedDBError::Query(
1887                            "SECRET() column encryption requires a bootstrapped \
1888                             vault (red.secret.aes_key is missing). Start the server \
1889                             with --vault to enable."
1890                                .to_string(),
1891                        )
1892                    })?;
1893                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
1894                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
1895                } else {
1896                    Ok(Value::Secret(bytes))
1897                }
1898            }
1899            other => Ok(other),
1900        }
1901    }
1902}
1903
1904/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
1905/// This is the on-disk representation of `Value::Secret`.
1906fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
1907    let nonce_bytes = crate::auth::store::random_bytes(12);
1908    let mut nonce = [0u8; 12];
1909    nonce.copy_from_slice(&nonce_bytes[..12]);
1910    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
1911    let mut out = Vec::with_capacity(12 + ct.len());
1912    out.extend_from_slice(&nonce);
1913    out.extend_from_slice(&ct);
1914    out
1915}
1916
1917/// Decode a `Value::Secret` payload back to plaintext. Returns
1918/// `None` when the payload is too short or AES-GCM authentication
1919/// fails (tampered or wrong key).
1920pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
1921    if payload.len() < 12 {
1922        return None;
1923    }
1924    let mut nonce = [0u8; 12];
1925    nonce.copy_from_slice(&payload[..12]);
1926    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
1927}
1928
1929fn split_insert_metadata(
1930    runtime: &RedDBRuntime,
1931    columns: &[String],
1932    values: &[Value],
1933) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
1934    let mut fields = Vec::new();
1935    let mut metadata = Vec::new();
1936
1937    for (column, value) in columns.iter().zip(values.iter()) {
1938        // Still support legacy _ttl columns for backward compat
1939        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
1940            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
1941            let (canonical_key, canonical_value) =
1942                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1943            metadata.push((canonical_key.to_string(), canonical_value));
1944            continue;
1945        }
1946        fields.push((
1947            column.clone(),
1948            runtime.resolve_crypto_sentinel(value.clone())?,
1949        ));
1950    }
1951
1952    Ok((fields, metadata))
1953}
1954
1955/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
1956fn merge_with_clauses(
1957    metadata: &mut Vec<(String, MetadataValue)>,
1958    ttl_ms: Option<u64>,
1959    expires_at_ms: Option<u64>,
1960    with_metadata: &[(String, Value)],
1961) {
1962    if let Some(ms) = ttl_ms {
1963        metadata.push((
1964            "_ttl_ms".to_string(),
1965            if ms <= i64::MAX as u64 {
1966                MetadataValue::Int(ms as i64)
1967            } else {
1968                MetadataValue::Timestamp(ms)
1969            },
1970        ));
1971    }
1972    if let Some(ms) = expires_at_ms {
1973        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
1974    }
1975    for (key, value) in with_metadata {
1976        let meta_value = match value {
1977            Value::Text(s) => MetadataValue::String(s.to_string()),
1978            Value::Integer(n) => MetadataValue::Int(*n),
1979            Value::Float(n) => MetadataValue::Float(*n),
1980            Value::Boolean(b) => MetadataValue::Bool(*b),
1981            _ => MetadataValue::String(value.to_string()),
1982        };
1983        metadata.push((key.clone(), meta_value));
1984    }
1985}
1986
1987fn merge_vector_metadata_column(
1988    metadata: &mut Vec<(String, MetadataValue)>,
1989    columns: &[String],
1990    values: &[Value],
1991) -> RedDBResult<()> {
1992    let Some(value) = columns
1993        .iter()
1994        .position(|column| column.eq_ignore_ascii_case("metadata"))
1995        .map(|index| &values[index])
1996    else {
1997        return Ok(());
1998    };
1999    let json = match value {
2000        Value::Null => return Ok(()),
2001        Value::Json(bytes) => crate::json::from_slice(bytes).map_err(|err| {
2002            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2003        })?,
2004        Value::Text(text) => crate::json::from_str(text).map_err(|err| {
2005            RedDBError::Query(format!("column 'metadata' invalid JSON object: {err}"))
2006        })?,
2007        other => {
2008            return Err(RedDBError::Query(format!(
2009                "column 'metadata' expected JSON object, got {other:?}"
2010            )))
2011        }
2012    };
2013    let parsed = metadata_from_json(&json)?;
2014    for (key, value) in parsed.iter() {
2015        metadata.push((key.clone(), value.clone()));
2016    }
2017    Ok(())
2018}
2019
2020fn apply_collection_default_ttl_metadata(
2021    runtime: &RedDBRuntime,
2022    collection: &str,
2023    metadata: &mut Vec<(String, MetadataValue)>,
2024) {
2025    if has_internal_ttl_metadata(metadata) {
2026        return;
2027    }
2028
2029    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
2030        return;
2031    };
2032
2033    metadata.push((
2034        "_ttl_ms".to_string(),
2035        if default_ttl_ms <= i64::MAX as u64 {
2036            MetadataValue::Int(default_ttl_ms as i64)
2037        } else {
2038            MetadataValue::Timestamp(default_ttl_ms)
2039        },
2040    ));
2041}
2042
2043fn ensure_non_tree_reserved_metadata_entries(
2044    metadata: &[(String, MetadataValue)],
2045) -> RedDBResult<()> {
2046    for (key, _) in metadata {
2047        ensure_non_tree_reserved_metadata_key(key)?;
2048    }
2049    Ok(())
2050}
2051
2052fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2053    if key.starts_with(TREE_METADATA_PREFIX) {
2054        return Err(RedDBError::Query(format!(
2055            "metadata key '{}' is reserved for managed trees",
2056            key
2057        )));
2058    }
2059    Ok(())
2060}
2061
2062fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2063    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2064        return Err(RedDBError::Query(format!(
2065            "edge label '{}' is reserved for managed trees",
2066            TREE_CHILD_EDGE_LABEL
2067        )));
2068    }
2069    Ok(())
2070}
2071
2072fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
2073    let mut columns = Vec::with_capacity(pairs.len());
2074    let mut values = Vec::with_capacity(pairs.len());
2075
2076    for (column, value) in pairs {
2077        columns.push(column.clone());
2078        values.push(value.clone());
2079    }
2080
2081    (columns, values)
2082}
2083
2084/// Find a required column value and return it as-is.
2085fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
2086    for (i, col) in columns.iter().enumerate() {
2087        if col.eq_ignore_ascii_case(name) {
2088            return Ok(values[i].clone());
2089        }
2090    }
2091    Err(RedDBError::Query(format!(
2092        "required column '{name}' not found in INSERT"
2093    )))
2094}
2095
2096/// Find a required column value and coerce to String.
2097fn find_column_value_string(
2098    columns: &[String],
2099    values: &[Value],
2100    name: &str,
2101) -> RedDBResult<String> {
2102    let val = find_column_value(columns, values, name)?;
2103    match val {
2104        Value::Text(s) => Ok(s.to_string()),
2105        Value::Integer(n) => Ok(n.to_string()),
2106        Value::Float(n) => Ok(n.to_string()),
2107        other => Err(RedDBError::Query(format!(
2108            "column '{name}' expected text, got {other:?}"
2109        ))),
2110    }
2111}
2112
2113fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
2114    let val = find_column_value(columns, values, name)?;
2115    match val {
2116        Value::Float(n) => Ok(n),
2117        Value::Integer(n) => Ok(n as f64),
2118        Value::UnsignedInteger(n) => Ok(n as f64),
2119        Value::Text(s) => s
2120            .parse::<f64>()
2121            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
2122        other => Err(RedDBError::Query(format!(
2123            "column '{name}' expected number, got {other:?}"
2124        ))),
2125    }
2126}
2127
2128/// Find an optional column value as String.
2129fn find_column_value_opt_string(
2130    columns: &[String],
2131    values: &[Value],
2132    name: &str,
2133) -> Option<String> {
2134    for (i, col) in columns.iter().enumerate() {
2135        if col.eq_ignore_ascii_case(name) {
2136            return match &values[i] {
2137                Value::Null => None,
2138                Value::Text(s) => Some(s.to_string()),
2139                Value::Integer(n) => Some(n.to_string()),
2140                Value::Float(n) => Some(n.to_string()),
2141                _ => None,
2142            };
2143        }
2144    }
2145    None
2146}
2147
2148/// Resolve an EDGE endpoint (`from`/`to`) to a numeric entity id.
2149///
2150/// Accepts integer literals, decimal strings, and node labels resolved via
2151/// the per-collection graph label index (same source of truth that
2152/// `GRAPH NEIGHBORHOOD` / `GRAPH TRAVERSE` use at query time). Ambiguous
2153/// labels error so callers can fall back to the numeric id form.
2154fn resolve_edge_endpoint(
2155    store: &crate::storage::unified::UnifiedStore,
2156    collection: &str,
2157    columns: &[String],
2158    values: &[Value],
2159    name: &str,
2160) -> RedDBResult<u64> {
2161    let val = find_column_value(columns, values, name)?;
2162    match val {
2163        Value::Integer(n) => Ok(n as u64),
2164        Value::UnsignedInteger(n) => Ok(n),
2165        Value::Text(s) => {
2166            if let Ok(n) = s.parse::<u64>() {
2167                return Ok(n);
2168            }
2169            let matches = store.lookup_graph_nodes_by_label_in(collection, &s);
2170            match matches.len() {
2171                0 => Err(RedDBError::Query(format!(
2172                    "column '{name}': no graph node with label '{s}' in collection '{collection}'"
2173                ))),
2174                1 => Ok(matches[0].raw()),
2175                n => Err(RedDBError::Query(format!(
2176                    "column '{name}': ambiguous label '{s}' matches {n} nodes in collection '{collection}'; use the numeric id"
2177                ))),
2178            }
2179        }
2180        other => Err(RedDBError::Query(format!(
2181            "column '{name}' expected integer or node label, got {other:?}"
2182        ))),
2183    }
2184}
2185
2186/// Find a required column value and coerce to u64.
2187fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
2188    let val = find_column_value(columns, values, name)?;
2189    match val {
2190        Value::Integer(n) => Ok(n as u64),
2191        Value::UnsignedInteger(n) => Ok(n),
2192        Value::Text(s) => s
2193            .parse::<u64>()
2194            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
2195        other => Err(RedDBError::Query(format!(
2196            "column '{name}' expected integer, got {other:?}"
2197        ))),
2198    }
2199}
2200
2201/// Find an optional column value as f32.
2202fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
2203    for (i, col) in columns.iter().enumerate() {
2204        if col.eq_ignore_ascii_case(name) {
2205            return match &values[i] {
2206                Value::Float(n) => Some(*n as f32),
2207                Value::Integer(n) => Some(*n as f32),
2208                Value::Null => None,
2209                _ => None,
2210            };
2211        }
2212    }
2213    None
2214}
2215
2216/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
2217fn find_column_value_vec_f32(
2218    columns: &[String],
2219    values: &[Value],
2220    name: &str,
2221) -> RedDBResult<Vec<f32>> {
2222    let val = find_column_value(columns, values, name)?;
2223    match val {
2224        Value::Vector(v) => Ok(v),
2225        Value::Json(bytes) => {
2226            // Try to parse as JSON array of numbers
2227            let s = std::str::from_utf8(&bytes).map_err(|_| {
2228                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
2229            })?;
2230            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
2231                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
2232            })?;
2233            Ok(arr)
2234        }
2235        other => Err(RedDBError::Query(format!(
2236            "column '{name}' expected vector, got {other:?}"
2237        ))),
2238    }
2239}
2240
2241fn find_column_value_vec_f32_any(
2242    columns: &[String],
2243    values: &[Value],
2244    names: &[&str],
2245) -> RedDBResult<Vec<f32>> {
2246    for name in names {
2247        if columns
2248            .iter()
2249            .any(|column| column.eq_ignore_ascii_case(name))
2250        {
2251            return find_column_value_vec_f32(columns, values, name);
2252        }
2253    }
2254    Err(RedDBError::Query(format!(
2255        "required vector column '{}' not found in INSERT",
2256        names.join("' or '")
2257    )))
2258}
2259
2260/// Extract remaining properties (all columns not in the exclusion list).
2261fn extract_remaining_properties(
2262    columns: &[String],
2263    values: &[Value],
2264    exclude: &[&str],
2265) -> Vec<(String, Value)> {
2266    columns
2267        .iter()
2268        .zip(values.iter())
2269        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
2270        .map(|(col, val)| (col.clone(), val.clone()))
2271        .collect()
2272}
2273
2274fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
2275    let mut invalid = Vec::new();
2276    for column in columns {
2277        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
2278            invalid.push(column.clone());
2279        }
2280    }
2281
2282    if invalid.is_empty() {
2283        Ok(())
2284    } else {
2285        Err(RedDBError::Query(format!(
2286            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
2287            invalid.join(", ")
2288        )))
2289    }
2290}
2291
2292fn is_timeseries_insert_column(column: &str) -> bool {
2293    matches!(
2294        column.to_ascii_lowercase().as_str(),
2295        "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
2296    )
2297}
2298
2299fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
2300    let mut found = None;
2301
2302    for alias in ["timestamp_ns", "timestamp", "time"] {
2303        for (index, column) in columns.iter().enumerate() {
2304            if !column.eq_ignore_ascii_case(alias) {
2305                continue;
2306            }
2307
2308            if found.is_some() {
2309                return Err(RedDBError::Query(
2310                    "timeseries INSERT accepts only one timestamp column".to_string(),
2311                ));
2312            }
2313
2314            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
2315        }
2316    }
2317
2318    Ok(found)
2319}
2320
2321fn find_timeseries_tags(
2322    columns: &[String],
2323    values: &[Value],
2324) -> RedDBResult<std::collections::HashMap<String, String>> {
2325    for (index, column) in columns.iter().enumerate() {
2326        if column.eq_ignore_ascii_case("tags") {
2327            return parse_timeseries_tags(&values[index]);
2328        }
2329    }
2330    Ok(std::collections::HashMap::new())
2331}
2332
2333fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
2334    match value {
2335        Value::Null => Ok(std::collections::HashMap::new()),
2336        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
2337        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
2338        other => Err(RedDBError::Query(format!(
2339            "timeseries tags must be a JSON object or JSON text, got {other:?}"
2340        ))),
2341    }
2342}
2343
2344fn parse_timeseries_tags_json(
2345    bytes: &[u8],
2346) -> RedDBResult<std::collections::HashMap<String, String>> {
2347    let json: crate::json::Value = crate::json::from_slice(bytes)
2348        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
2349
2350    let object = match json {
2351        crate::json::Value::Object(object) => object,
2352        other => {
2353            return Err(RedDBError::Query(format!(
2354                "timeseries tags must be a JSON object, got {other:?}"
2355            )))
2356        }
2357    };
2358
2359    let mut tags = std::collections::HashMap::with_capacity(object.len());
2360    for (key, value) in object {
2361        tags.insert(key, json_tag_value_to_string(&value));
2362    }
2363    Ok(tags)
2364}
2365
2366fn json_tag_value_to_string(value: &crate::json::Value) -> String {
2367    match value {
2368        crate::json::Value::Null => "null".to_string(),
2369        crate::json::Value::Bool(value) => value.to_string(),
2370        crate::json::Value::Number(value) => value.to_string(),
2371        crate::json::Value::String(value) => value.clone(),
2372        other => other.to_string(),
2373    }
2374}
2375
2376fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
2377    match value {
2378        Value::UnsignedInteger(value) => Ok(*value),
2379        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
2380        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
2381        Value::Text(value) => value.parse::<u64>().map_err(|_| {
2382            RedDBError::Query(format!(
2383                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
2384            ))
2385        }),
2386        other => Err(RedDBError::Query(format!(
2387            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
2388        ))),
2389    }
2390}
2391
2392fn current_unix_ns() -> u64 {
2393    std::time::SystemTime::now()
2394        .duration_since(std::time::UNIX_EPOCH)
2395        .unwrap_or_default()
2396        .as_nanos()
2397        .min(u128::from(u64::MAX)) as u64
2398}
2399
2400fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
2401    use crate::json::{Map, Value as JV};
2402    match value {
2403        MetadataValue::Null => JV::Null,
2404        MetadataValue::Bool(value) => JV::Bool(*value),
2405        MetadataValue::Int(value) => JV::Number(*value as f64),
2406        MetadataValue::Float(value) => JV::Number(*value),
2407        MetadataValue::String(value) => JV::String(value.clone()),
2408        MetadataValue::Bytes(value) => JV::Array(
2409            value
2410                .iter()
2411                .map(|value| JV::Number(*value as f64))
2412                .collect(),
2413        ),
2414        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
2415        MetadataValue::Array(values) => {
2416            JV::Array(values.iter().map(metadata_value_to_json).collect())
2417        }
2418        MetadataValue::Object(object) => {
2419            let entries = object
2420                .iter()
2421                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
2422                .collect();
2423            JV::Object(entries)
2424        }
2425        MetadataValue::Geo { lat, lon } => {
2426            let mut object = Map::new();
2427            object.insert("lat".to_string(), JV::Number(*lat));
2428            object.insert("lon".to_string(), JV::Number(*lon));
2429            JV::Object(object)
2430        }
2431        MetadataValue::Reference(target) => {
2432            let mut object = Map::new();
2433            object.insert(
2434                "collection".to_string(),
2435                JV::String(target.collection().to_string()),
2436            );
2437            object.insert(
2438                "entity_id".to_string(),
2439                JV::Number(target.entity_id().raw() as f64),
2440            );
2441            JV::Object(object)
2442        }
2443        MetadataValue::References(values) => {
2444            let refs = values
2445                .iter()
2446                .map(|target| {
2447                    let mut object = Map::new();
2448                    object.insert(
2449                        "collection".to_string(),
2450                        JV::String(target.collection().to_string()),
2451                    );
2452                    object.insert(
2453                        "entity_id".to_string(),
2454                        JV::Number(target.entity_id().raw() as f64),
2455                    );
2456                    JV::Object(object)
2457                })
2458                .collect();
2459            JV::Array(refs)
2460        }
2461    }
2462}
2463
2464fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
2465    match value {
2466        Value::Null => MetadataValue::Null,
2467        Value::Boolean(value) => MetadataValue::Bool(*value),
2468        Value::Integer(value) => MetadataValue::Int(*value),
2469        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
2470        Value::Float(value) => MetadataValue::Float(*value),
2471        Value::Text(value) => MetadataValue::String(value.to_string()),
2472        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
2473        Value::Timestamp(value) => {
2474            if *value >= 0 {
2475                metadata_u64_to_value(*value as u64)
2476            } else {
2477                MetadataValue::Int(*value)
2478            }
2479        }
2480        Value::TimestampMs(value) => {
2481            if *value >= 0 {
2482                metadata_u64_to_value(*value as u64)
2483            } else {
2484                MetadataValue::Int(*value)
2485            }
2486        }
2487        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
2488        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
2489        Value::Date(value) => MetadataValue::String(value.to_string()),
2490        Value::Time(value) => MetadataValue::String(value.to_string()),
2491        Value::Decimal(value) => MetadataValue::String(value.to_string()),
2492        Value::Ipv4(value) => MetadataValue::String(format!(
2493            "{}.{}.{}.{}",
2494            (value >> 24) & 0xFF,
2495            (value >> 16) & 0xFF,
2496            (value >> 8) & 0xFF,
2497            value & 0xFF
2498        )),
2499        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
2500        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2501        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2502        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
2503            lat: *lat as f64 / 1_000_000.0,
2504            lon: *lon as f64 / 1_000_000.0,
2505        },
2506        Value::BigInt(value) => MetadataValue::String(value.to_string()),
2507        Value::TableRef(value) => MetadataValue::String(value.clone()),
2508        Value::PageRef(value) => MetadataValue::Int(*value as i64),
2509        Value::Password(value) => MetadataValue::String(value.clone()),
2510        Value::Array(values) => {
2511            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
2512        }
2513        _ => MetadataValue::String(value.to_string()),
2514    }
2515}
2516
2517fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
2518    match value {
2519        Value::Null => Ok(MetadataValue::Null),
2520        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
2521        Value::Integer(_) => Err(RedDBError::Query(format!(
2522            "column '{field}' must be non-negative for TTL metadata"
2523        ))),
2524        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
2525        Value::Float(value) if value.is_finite() => {
2526            if value.fract().abs() >= f64::EPSILON {
2527                return Err(RedDBError::Query(format!(
2528                    "column '{field}' must be an integer (TTL metadata must be an integer)"
2529                )));
2530            }
2531            if *value < 0.0 {
2532                return Err(RedDBError::Query(format!(
2533                    "column '{field}' must be non-negative for TTL metadata"
2534                )));
2535            }
2536            if *value > u64::MAX as f64 {
2537                return Err(RedDBError::Query(format!(
2538                    "column '{field}' value is too large"
2539                )));
2540            }
2541            Ok(metadata_u64_to_value(*value as u64))
2542        }
2543        Value::Float(_) => Err(RedDBError::Query(format!(
2544            "column '{field}' must be a finite number"
2545        ))),
2546        Value::Text(value) => {
2547            let value = value.trim();
2548            if let Ok(value) = value.parse::<u64>() {
2549                Ok(metadata_u64_to_value(value))
2550            } else if let Ok(value) = value.parse::<i64>() {
2551                if value < 0 {
2552                    return Err(RedDBError::Query(format!(
2553                        "column '{field}' must be non-negative for TTL metadata"
2554                    )));
2555                }
2556                Ok(metadata_u64_to_value(value as u64))
2557            } else if let Ok(value) = value.parse::<f64>() {
2558                if !value.is_finite() {
2559                    return Err(RedDBError::Query(format!(
2560                        "column '{field}' must be a finite number"
2561                    )));
2562                }
2563                if value.fract().abs() >= f64::EPSILON {
2564                    return Err(RedDBError::Query(format!(
2565                        "column '{field}' must be an integer (TTL metadata must be an integer)"
2566                    )));
2567                }
2568                if value < 0.0 {
2569                    return Err(RedDBError::Query(format!(
2570                        "column '{field}' must be non-negative for TTL metadata"
2571                    )));
2572                }
2573                if value > u64::MAX as f64 {
2574                    return Err(RedDBError::Query(format!(
2575                        "column '{field}' value is too large"
2576                    )));
2577                }
2578                Ok(metadata_u64_to_value(value as u64))
2579            } else {
2580                Err(RedDBError::Query(format!(
2581                    "column '{field}' expects a numeric value for TTL metadata"
2582                )))
2583            }
2584        }
2585        _ => Err(RedDBError::Query(format!(
2586            "column '{field}' expects a numeric value for TTL metadata"
2587        ))),
2588    }
2589}
2590
2591fn metadata_u64_to_value(value: u64) -> MetadataValue {
2592    if value <= i64::MAX as u64 {
2593        MetadataValue::Int(value as i64)
2594    } else {
2595        MetadataValue::Timestamp(value)
2596    }
2597}
2598
2599/// Phase 2 PG parity: inspect a column value and return `true` when
2600/// the dotted `tail` path is already present under it. Used by the
2601/// tenant auto-fill so rows that already carry an explicit value
2602/// (bulk import, admin insert on behalf of a tenant) are not
2603/// double-stamped with the session's current_tenant().
2604fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
2605    let json = match value {
2606        Value::Null => return false,
2607        Value::Json(bytes) | Value::Blob(bytes) => {
2608            match crate::json::from_slice::<crate::json::Value>(bytes) {
2609                Ok(v) => v,
2610                Err(_) => return false,
2611            }
2612        }
2613        Value::Text(s) => {
2614            let trimmed = s.trim_start();
2615            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
2616                return false;
2617            }
2618            match crate::json::from_str::<crate::json::Value>(s) {
2619                Ok(v) => v,
2620                Err(_) => return false,
2621            }
2622        }
2623        _ => return false,
2624    };
2625    let mut cursor = &json;
2626    for seg in tail.split('.') {
2627        match cursor {
2628            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
2629                Some((_, v)) => cursor = v,
2630                None => return false,
2631            },
2632            _ => return false,
2633        }
2634    }
2635    !matches!(cursor, crate::json::Value::Null)
2636}
2637
2638/// Phase 2 PG parity: take a column value (possibly Null / Text /
2639/// Json) and return a `Value::Json` with the dotted `tail` path set
2640/// to `tenant_id`. Preserves every pre-existing key.
2641///
2642/// Accepts:
2643/// * `Value::Null`  → fresh `{tail: tenant_id}` object
2644/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
2645/// * `Value::text(s)` if `s` is valid JSON → same as Json
2646/// * anything else → error (user supplied a scalar where we need
2647///   a JSON container)
2648fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
2649    let mut root = match current {
2650        Value::Null => crate::json::Value::Object(Default::default()),
2651        Value::Json(bytes) | Value::Blob(bytes) => {
2652            crate::json::from_slice(&bytes).map_err(|err| {
2653                RedDBError::Query(format!(
2654                    "tenant auto-fill: root column is not valid JSON ({err})"
2655                ))
2656            })?
2657        }
2658        Value::Text(s) => {
2659            if s.trim().is_empty() {
2660                crate::json::Value::Object(Default::default())
2661            } else {
2662                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
2663                    RedDBError::Query(format!(
2664                        "tenant auto-fill: text root is not valid JSON ({err})"
2665                    ))
2666                })?
2667            }
2668        }
2669        other => {
2670            return Err(RedDBError::Query(format!(
2671                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
2672            )));
2673        }
2674    };
2675
2676    // Navigate path segments, creating intermediate objects on demand.
2677    let segments: Vec<&str> = tail.split('.').collect();
2678    let mut cursor: &mut crate::json::Value = &mut root;
2679    for (i, seg) in segments.iter().enumerate() {
2680        let is_last = i + 1 == segments.len();
2681        let map = match cursor {
2682            crate::json::Value::Object(m) => m,
2683            _ => {
2684                return Err(RedDBError::Query(format!(
2685                    "tenant auto-fill: segment '{seg}' is not inside an object"
2686                )));
2687            }
2688        };
2689        if is_last {
2690            map.insert(
2691                seg.to_string(),
2692                crate::json::Value::String(tenant_id.to_string()),
2693            );
2694            break;
2695        }
2696        cursor = map
2697            .entry(seg.to_string())
2698            .or_insert_with(|| crate::json::Value::Object(Default::default()));
2699    }
2700
2701    let bytes = crate::json::to_vec(&root).map_err(|err| {
2702        RedDBError::Query(format!(
2703            "tenant auto-fill: failed to re-serialize JSON ({err})"
2704        ))
2705    })?;
2706    Ok(Value::Json(bytes))
2707}
2708
2709#[cfg(test)]
2710mod tests {
2711    use crate::storage::schema::Value;
2712    use crate::{RedDBOptions, RedDBRuntime};
2713
2714    #[test]
2715    fn update_where_id_in_with_hash_index_updates_expected_rows() {
2716        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2717        rt.execute_query("CREATE TABLE users (id INT, score INT)")
2718            .unwrap();
2719        for id in 0..5 {
2720            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
2721                .unwrap();
2722        }
2723        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
2724            .unwrap();
2725
2726        let updated = rt
2727            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
2728            .unwrap();
2729        assert_eq!(updated.affected_rows, 3);
2730
2731        let selected = rt
2732            .execute_query("SELECT id, score FROM users ORDER BY id")
2733            .unwrap();
2734        let scores: Vec<(i64, i64)> = selected
2735            .result
2736            .records
2737            .iter()
2738            .map(|record| {
2739                let id = match record.get("id").unwrap() {
2740                    Value::Integer(value) => *value,
2741                    other => panic!("expected integer id, got {other:?}"),
2742                };
2743                let score = match record.get("score").unwrap() {
2744                    Value::Integer(value) => *value,
2745                    other => panic!("expected integer score, got {other:?}"),
2746                };
2747                (id, score)
2748            })
2749            .collect();
2750        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
2751    }
2752
2753    /// Drives UPDATE through the shared `DmlTargetScan` module — the
2754    /// same code path DELETE uses (#51, #52). Exercises the indexed
2755    /// equality fast-path (WHERE id = N with a HASH index), the
2756    /// unindexed range scan (WHERE score > N), and the no-WHERE
2757    /// full-scan branch to confirm the extracted "find target rows"
2758    /// loop preserves affected-row counts and the resulting row state.
2759    #[test]
2760    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2761        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2762        rt.execute_query("CREATE TABLE items (id INT, score INT)")
2763            .unwrap();
2764        for id in 0..5 {
2765            rt.execute_query(&format!(
2766                "INSERT INTO items (id, score) VALUES ({id}, {})",
2767                id * 10
2768            ))
2769            .unwrap();
2770        }
2771        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2772            .unwrap();
2773
2774        // Indexed equality UPDATE — hits the hash fast-path inside
2775        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
2776        // below the score>25 cutoff so the next assertion stays clean.
2777        let updated_one = rt
2778            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
2779            .unwrap();
2780        assert_eq!(updated_one.affected_rows, 1);
2781
2782        // Unindexed scan UPDATE — bumps everyone with score > 25,
2783        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
2784        // zoned/full-scan branch.
2785        let updated_many = rt
2786            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
2787            .unwrap();
2788        assert_eq!(updated_many.affected_rows, 2);
2789
2790        let snapshot = rt
2791            .execute_query("SELECT id, score FROM items ORDER BY id")
2792            .unwrap();
2793        let pairs: Vec<(i64, i64)> = snapshot
2794            .result
2795            .records
2796            .iter()
2797            .map(|record| {
2798                let id = match record.get("id").unwrap() {
2799                    Value::Integer(value) => *value,
2800                    other => panic!("expected integer id, got {other:?}"),
2801                };
2802                let score = match record.get("score").unwrap() {
2803                    Value::Integer(value) => *value,
2804                    other => panic!("expected integer score, got {other:?}"),
2805                };
2806                (id, score)
2807            })
2808            .collect();
2809        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
2810
2811        // Full-scan UPDATE with no WHERE rewrites every remaining row.
2812        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
2813        assert_eq!(updated_all.affected_rows, 5);
2814        let after = rt
2815            .execute_query("SELECT score FROM items ORDER BY id")
2816            .unwrap();
2817        let scores: Vec<i64> = after
2818            .result
2819            .records
2820            .iter()
2821            .map(|record| match record.get("score").unwrap() {
2822                Value::Integer(value) => *value,
2823                other => panic!("expected integer score, got {other:?}"),
2824            })
2825            .collect();
2826        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
2827    }
2828
2829    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
2830    /// both the index fast-path (WHERE id = N with a HASH index) and
2831    /// the unindexed scan path (WHERE score > N) to confirm the
2832    /// extracted "find target rows" loop preserves the affected-row
2833    /// count and which rows survive.
2834    #[test]
2835    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2836        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2837        rt.execute_query("CREATE TABLE items (id INT, score INT)")
2838            .unwrap();
2839        for id in 0..5 {
2840            rt.execute_query(&format!(
2841                "INSERT INTO items (id, score) VALUES ({id}, {})",
2842                id * 10
2843            ))
2844            .unwrap();
2845        }
2846        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2847            .unwrap();
2848
2849        // Indexed equality DELETE — hits the hash fast-path inside
2850        // DmlTargetScan::find_target_ids.
2851        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2852        assert_eq!(deleted_one.affected_rows, 1);
2853
2854        // Unindexed scan DELETE — drops everyone with score > 25,
2855        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
2856        // zoned/full-scan branch.
2857        let deleted_many = rt
2858            .execute_query("DELETE FROM items WHERE score > 25")
2859            .unwrap();
2860        assert_eq!(deleted_many.affected_rows, 2);
2861
2862        let surviving = rt
2863            .execute_query("SELECT id FROM items ORDER BY id")
2864            .unwrap();
2865        let ids: Vec<i64> = surviving
2866            .result
2867            .records
2868            .iter()
2869            .map(|record| match record.get("id").unwrap() {
2870                Value::Integer(value) => *value,
2871                other => panic!("expected integer id, got {other:?}"),
2872            })
2873            .collect();
2874        assert_eq!(ids, vec![0, 1]);
2875
2876        // Sanity: full-scan DELETE with no WHERE clears the rest.
2877        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
2878        assert_eq!(deleted_rest.affected_rows, 2);
2879        let empty = rt.execute_query("SELECT id FROM items").unwrap();
2880        assert!(empty.result.records.is_empty());
2881    }
2882
2883    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
2884    /// INSERT but reject UPDATE and DELETE with the documented
2885    /// operator-facing error strings. Drives all three DML verbs so
2886    /// the centralized gate is exercised end-to-end.
2887    #[test]
2888    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
2889        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2890        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
2891            .unwrap();
2892
2893        // INSERT must succeed — APPEND ONLY exists precisely to allow
2894        // appends. The gate should be a no-op for INSERT.
2895        let inserted = rt
2896            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
2897            .unwrap();
2898        assert_eq!(inserted.affected_rows, 1);
2899
2900        // UPDATE is rejected with the gate's UPDATE-specific message.
2901        let update_err = rt
2902            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
2903            .unwrap_err();
2904        let msg = format!("{update_err}");
2905        assert!(
2906            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
2907            "expected UPDATE rejection message, got: {msg}"
2908        );
2909
2910        // DELETE is rejected with the gate's DELETE-specific message.
2911        let delete_err = rt
2912            .execute_query("DELETE FROM events WHERE id = 1")
2913            .unwrap_err();
2914        let msg = format!("{delete_err}");
2915        assert!(
2916            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
2917            "expected DELETE rejection message, got: {msg}"
2918        );
2919
2920        // Row should still be present — neither rejected mutation
2921        // touched storage.
2922        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
2923        assert_eq!(surviving.result.records.len(), 1);
2924    }
2925
2926    /// CollectionContract gate: tables without an APPEND ONLY contract
2927    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
2928    /// is a true pass-through, not an accidental block.
2929    #[test]
2930    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
2931        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2932        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
2933            .unwrap();
2934
2935        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
2936            .unwrap();
2937        let updated = rt
2938            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
2939            .unwrap();
2940        assert_eq!(updated.affected_rows, 1);
2941        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
2942        assert_eq!(deleted.affected_rows, 1);
2943    }
2944
2945    #[test]
2946    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
2947        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2948        rt.execute_query(
2949            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
2950        )
2951        .unwrap();
2952
2953        let inserted = rt
2954            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
2955            .unwrap();
2956        assert_eq!(inserted.affected_rows, 1);
2957
2958        let events = queue_payloads(&rt, "audit_log");
2959        assert_eq!(events.len(), 1);
2960        let event = events[0].as_object().expect("event payload object");
2961        assert!(event
2962            .get("event_id")
2963            .and_then(crate::json::Value::as_str)
2964            .is_some_and(|value| !value.is_empty()));
2965        assert_eq!(
2966            event.get("op").and_then(crate::json::Value::as_str),
2967            Some("insert")
2968        );
2969        assert_eq!(
2970            event.get("collection").and_then(crate::json::Value::as_str),
2971            Some("users")
2972        );
2973        assert_eq!(
2974            event.get("id").and_then(crate::json::Value::as_u64),
2975            Some(7)
2976        );
2977        assert!(event
2978            .get("ts")
2979            .and_then(crate::json::Value::as_u64)
2980            .is_some());
2981        assert!(event
2982            .get("lsn")
2983            .and_then(crate::json::Value::as_u64)
2984            .is_some());
2985        assert!(matches!(
2986            event.get("tenant"),
2987            Some(crate::json::Value::Null)
2988        ));
2989        assert!(matches!(
2990            event.get("before"),
2991            Some(crate::json::Value::Null)
2992        ));
2993        let after = event
2994            .get("after")
2995            .and_then(crate::json::Value::as_object)
2996            .expect("after object");
2997        assert_eq!(
2998            after.get("id").and_then(crate::json::Value::as_u64),
2999            Some(7)
3000        );
3001        assert_eq!(
3002            after.get("email").and_then(crate::json::Value::as_str),
3003            Some("a@example.com")
3004        );
3005    }
3006
3007    #[test]
3008    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
3009        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3010        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
3011            .unwrap();
3012
3013        rt.execute_query(
3014            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
3015        )
3016        .unwrap();
3017
3018        let events = queue_payloads(&rt, "users_events");
3019        assert_eq!(events.len(), 2);
3020        let mut previous_lsn = 0;
3021        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
3022            let object = event.as_object().expect("event payload object");
3023            assert_eq!(
3024                object.get("op").and_then(crate::json::Value::as_str),
3025                Some("insert")
3026            );
3027            assert_eq!(
3028                object.get("id").and_then(crate::json::Value::as_u64),
3029                Some(expected_id)
3030            );
3031            let lsn = object
3032                .get("lsn")
3033                .and_then(crate::json::Value::as_u64)
3034                .expect("event lsn");
3035            assert!(
3036                lsn > previous_lsn,
3037                "event LSNs should increase in row order"
3038            );
3039            previous_lsn = lsn;
3040            let after = object
3041                .get("after")
3042                .and_then(crate::json::Value::as_object)
3043                .expect("after object");
3044            assert_eq!(
3045                after.get("id").and_then(crate::json::Value::as_u64),
3046                Some(expected_id)
3047            );
3048        }
3049    }
3050
3051    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
3052        let result = rt
3053            .execute_query(&format!("QUEUE PEEK {queue} 10"))
3054            .expect("peek queue");
3055        result
3056            .result
3057            .records
3058            .iter()
3059            .map(
3060                |record| match record.get("payload").expect("payload column") {
3061                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
3062                    other => panic!("expected JSON queue payload, got {other:?}"),
3063                },
3064            )
3065            .collect()
3066    }
3067
3068    // ── #112: auto-index user `id` on first insert ─────────────────────
3069
3070    /// First insert into a fresh collection that carries a column named
3071    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
3072    /// populate it transparently, and `WHERE id = N` lookups exercise
3073    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
3074    ///
3075    /// This is the load-bearing acceptance test for #112 — without the
3076    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
3077    /// fall through to a full segment scan (the 4× perf gap documented
3078    /// in `docs/perf/delete-sequential-2026-05-06.md`).
3079    #[test]
3080    fn auto_index_id_fires_on_first_insert() {
3081        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3082        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
3083            .unwrap();
3084
3085        // Pre-condition: no index on `id` yet.
3086        assert!(
3087            rt.index_store_ref()
3088                .find_index_for_column("bench_users", "id")
3089                .is_none(),
3090            "freshly created collection should not have an `id` index"
3091        );
3092
3093        // Single-row INSERT — drives `MutationEngine::append_one`.
3094        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
3095            .unwrap();
3096
3097        // Post-condition: hash index registered on `id`.
3098        let registered = rt
3099            .index_store_ref()
3100            .find_index_for_column("bench_users", "id")
3101            .expect("auto-index hook should have registered idx_id on first insert");
3102        assert_eq!(registered.name, "idx_id");
3103        assert_eq!(registered.collection, "bench_users");
3104        assert_eq!(registered.columns, vec!["id".to_string()]);
3105        assert!(matches!(
3106            registered.method,
3107            super::super::index_store::IndexMethodKind::Hash
3108        ));
3109
3110        // Subsequent inserts populate the index; `WHERE id = N` should
3111        // resolve via the hash fast path and round-trip every row.
3112        for id in 2..=5 {
3113            rt.execute_query(&format!(
3114                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
3115                id * 10
3116            ))
3117            .unwrap();
3118        }
3119        for id in 1..=5 {
3120            let result = rt
3121                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
3122                .unwrap();
3123            assert_eq!(
3124                result.result.records.len(),
3125                1,
3126                "id={id} should match one row"
3127            );
3128        }
3129
3130        // Delete via the hash fast-path — exactly the bench scenario the
3131        // perf doc identified as the 4× regression. With the index
3132        // present, `find_target_ids` short-circuits before
3133        // `for_each_entity_zoned` runs.
3134        let deleted = rt
3135            .execute_query("DELETE FROM bench_users WHERE id = 3")
3136            .unwrap();
3137        assert_eq!(deleted.affected_rows, 1);
3138    }
3139
3140    /// Bulk INSERT (the multi-row VALUES path) drives
3141    /// `MutationEngine::append_batch`. The hook must fire there too —
3142    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
3143    /// wire bulk INSERT) skip auto-indexing entirely.
3144    #[test]
3145    fn auto_index_id_fires_on_first_bulk_insert() {
3146        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3147        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
3148            .unwrap();
3149
3150        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
3151            .unwrap();
3152
3153        let registered = rt
3154            .index_store_ref()
3155            .find_index_for_column("bench_bulk", "id")
3156            .expect("auto-index hook should fire on first bulk insert");
3157        assert_eq!(registered.name, "idx_id");
3158
3159        // Every row populated via `index_entity_insert_batch`.
3160        for id in 1..=3 {
3161            let result = rt
3162                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
3163                .unwrap();
3164            assert_eq!(result.result.records.len(), 1);
3165        }
3166    }
3167
3168    /// Hook is a no-op when the row carries no `id` column. Conservative
3169    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
3170    /// don't trigger it.
3171    #[test]
3172    fn auto_index_id_skips_when_no_id_column() {
3173        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3174        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
3175            .unwrap();
3176        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
3177            .unwrap();
3178
3179        assert!(rt
3180            .index_store_ref()
3181            .find_index_for_column("plain", "id")
3182            .is_none());
3183        assert!(rt
3184            .index_store_ref()
3185            .find_index_for_column("plain", "uid")
3186            .is_none());
3187    }
3188
3189    /// Hook only fires once per collection. If an explicit
3190    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
3191    /// detects it via `find_index_for_column` and does NOT clobber it
3192    /// with a HASH index on the next insert.
3193    #[test]
3194    fn auto_index_id_skips_when_index_already_exists() {
3195        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3196        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
3197            .unwrap();
3198        // User-declared BTREE index on `id` before any insert.
3199        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
3200            .unwrap();
3201        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
3202            .unwrap();
3203
3204        let registered = rt
3205            .index_store_ref()
3206            .find_index_for_column("pre", "id")
3207            .expect("user index should still be there");
3208        assert_eq!(
3209            registered.name, "user_idx",
3210            "auto-index hook must not overwrite an existing index"
3211        );
3212    }
3213
3214    /// Implicit `idx_id` is reaped when the collection drops. The
3215    /// existing `execute_drop_table` walks `list_indices` and drops every
3216    /// entry — confirm the auto-created index participates.
3217    #[test]
3218    fn auto_index_id_dropped_with_collection() {
3219        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3220        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
3221            .unwrap();
3222        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
3223            .unwrap();
3224        assert!(rt
3225            .index_store_ref()
3226            .find_index_for_column("ephemeral", "id")
3227            .is_some());
3228
3229        rt.execute_query("DROP TABLE ephemeral").unwrap();
3230
3231        assert!(
3232            rt.index_store_ref()
3233                .find_index_for_column("ephemeral", "id")
3234                .is_none(),
3235            "implicit `idx_id` must be reaped when its collection drops"
3236        );
3237    }
3238
3239    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
3240    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
3241    /// off, first insert leaves the collection without an `id` index —
3242    /// DELETE/UPDATE fall back to the scan path.
3243    #[test]
3244    fn auto_index_id_disabled_by_config() {
3245        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
3246        let rt = RedDBRuntime::with_options(opts).unwrap();
3247
3248        rt.execute_query("CREATE TABLE off (id INT, score INT)")
3249            .unwrap();
3250        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
3251            .unwrap();
3252
3253        assert!(
3254            rt.index_store_ref()
3255                .find_index_for_column("off", "id")
3256                .is_none(),
3257            "with auto_index_id=false, no implicit index should be created"
3258        );
3259    }
3260
3261    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
3262
3263    #[test]
3264    fn update_single_row_emits_update_event() {
3265        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3266        rt.execute_query(
3267            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
3268        )
3269        .unwrap();
3270        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3271            .unwrap();
3272
3273        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
3274            .unwrap();
3275
3276        let events = queue_payloads(&rt, "audit_log");
3277        assert_eq!(events.len(), 1, "expected exactly 1 update event");
3278        let event = events[0].as_object().expect("event payload object");
3279        assert_eq!(
3280            event.get("op").and_then(crate::json::Value::as_str),
3281            Some("update")
3282        );
3283        assert_eq!(
3284            event.get("collection").and_then(crate::json::Value::as_str),
3285            Some("users")
3286        );
3287        assert!(event
3288            .get("event_id")
3289            .and_then(crate::json::Value::as_str)
3290            .is_some_and(|v| !v.is_empty()));
3291        let before = event
3292            .get("before")
3293            .and_then(crate::json::Value::as_object)
3294            .expect("before must be an object");
3295        let after = event
3296            .get("after")
3297            .and_then(crate::json::Value::as_object)
3298            .expect("after must be an object");
3299        assert_eq!(
3300            before.get("name").and_then(crate::json::Value::as_str),
3301            Some("Alice"),
3302            "before.name should be the old value"
3303        );
3304        assert_eq!(
3305            after.get("name").and_then(crate::json::Value::as_str),
3306            Some("Bob"),
3307            "after.name should be the new value"
3308        );
3309    }
3310
3311    #[test]
3312    fn update_event_only_includes_changed_fields() {
3313        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3314        rt.execute_query(
3315            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
3316        )
3317        .unwrap();
3318        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
3319            .unwrap();
3320
3321        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
3322            .unwrap();
3323
3324        let events = queue_payloads(&rt, "evts");
3325        assert_eq!(events.len(), 1);
3326        let event = events[0].as_object().unwrap();
3327        let before = event
3328            .get("before")
3329            .and_then(crate::json::Value::as_object)
3330            .unwrap();
3331        let after = event
3332            .get("after")
3333            .and_then(crate::json::Value::as_object)
3334            .unwrap();
3335        // Only changed field included.
3336        assert!(
3337            before.contains_key("name"),
3338            "before must include changed field"
3339        );
3340        assert!(
3341            after.contains_key("name"),
3342            "after must include changed field"
3343        );
3344        // Unchanged fields must not appear.
3345        assert!(
3346            !before.contains_key("email"),
3347            "before must not include unchanged email"
3348        );
3349        assert!(
3350            !after.contains_key("email"),
3351            "after must not include unchanged email"
3352        );
3353    }
3354
3355    #[test]
3356    fn multi_row_update_emits_one_event_per_row() {
3357        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3358        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
3359            .unwrap();
3360        rt.execute_query(
3361            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
3362        )
3363        .unwrap();
3364
3365        rt.execute_query("UPDATE items SET status = 'done'")
3366            .unwrap();
3367
3368        let events = queue_payloads(&rt, "evts");
3369        assert_eq!(events.len(), 3, "expected one update event per row");
3370        for event in &events {
3371            let obj = event.as_object().unwrap();
3372            assert_eq!(
3373                obj.get("op").and_then(crate::json::Value::as_str),
3374                Some("update")
3375            );
3376        }
3377    }
3378
3379    #[test]
3380    fn delete_single_row_emits_delete_event() {
3381        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3382        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
3383            .unwrap();
3384        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
3385            .unwrap();
3386
3387        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
3388
3389        let events = queue_payloads(&rt, "del_log");
3390        assert_eq!(events.len(), 1);
3391        let event = events[0].as_object().expect("event payload object");
3392        assert_eq!(
3393            event.get("op").and_then(crate::json::Value::as_str),
3394            Some("delete")
3395        );
3396        assert_eq!(
3397            event.get("collection").and_then(crate::json::Value::as_str),
3398            Some("users")
3399        );
3400        assert!(event
3401            .get("event_id")
3402            .and_then(crate::json::Value::as_str)
3403            .is_some_and(|v| !v.is_empty()));
3404        let before = event
3405            .get("before")
3406            .and_then(crate::json::Value::as_object)
3407            .expect("before must be an object for delete");
3408        assert_eq!(
3409            before.get("id").and_then(crate::json::Value::as_u64),
3410            Some(42)
3411        );
3412        assert_eq!(
3413            before.get("name").and_then(crate::json::Value::as_str),
3414            Some("Alice")
3415        );
3416        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
3417    }
3418
3419    #[test]
3420    fn multi_row_delete_emits_one_event_per_row() {
3421        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3422        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
3423            .unwrap();
3424        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
3425            .unwrap();
3426
3427        rt.execute_query("DELETE FROM items").unwrap();
3428
3429        let events = queue_payloads(&rt, "del_log");
3430        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
3431        for event in &events {
3432            let obj = event.as_object().unwrap();
3433            assert_eq!(
3434                obj.get("op").and_then(crate::json::Value::as_str),
3435                Some("delete")
3436            );
3437            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
3438        }
3439    }
3440
3441    #[test]
3442    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
3443        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3444        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
3445            .unwrap();
3446
3447        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3448            .unwrap();
3449        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3450
3451        let events = queue_payloads(&rt, "evts");
3452        assert!(
3453            events.is_empty(),
3454            "UPDATE-only filter must not emit INSERT or DELETE events"
3455        );
3456    }
3457
3458    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
3459
3460    #[test]
3461    fn suppress_events_on_insert_emits_no_events() {
3462        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3463        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3464            .unwrap();
3465
3466        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3467            .unwrap();
3468
3469        let events = queue_payloads(&rt, "evts");
3470        assert!(
3471            events.is_empty(),
3472            "SUPPRESS EVENTS must prevent INSERT events"
3473        );
3474    }
3475
3476    #[test]
3477    fn suppress_events_on_update_emits_no_events() {
3478        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3479        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3480            .unwrap();
3481        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3482            .unwrap();
3483        // drain the INSERT event
3484        let _ = queue_payloads(&rt, "evts");
3485        // Force pop to drain; simpler: just check new count after UPDATE
3486        rt.execute_query("QUEUE PURGE evts").unwrap();
3487
3488        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
3489            .unwrap();
3490
3491        let events = queue_payloads(&rt, "evts");
3492        assert!(
3493            events.is_empty(),
3494            "SUPPRESS EVENTS must prevent UPDATE events"
3495        );
3496    }
3497
3498    #[test]
3499    fn suppress_events_on_delete_emits_no_events() {
3500        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3501        rt.execute_query(
3502            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
3503        )
3504        .unwrap();
3505        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3506            .unwrap();
3507
3508        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
3509            .unwrap();
3510
3511        let events = queue_payloads(&rt, "evts");
3512        assert!(
3513            events.is_empty(),
3514            "SUPPRESS EVENTS must prevent DELETE events"
3515        );
3516    }
3517
3518    #[test]
3519    fn normal_insert_after_suppress_still_emits() {
3520        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3521        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3522            .unwrap();
3523
3524        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3525            .unwrap();
3526        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
3527            .unwrap();
3528
3529        let events = queue_payloads(&rt, "evts");
3530        assert_eq!(
3531            events.len(),
3532            1,
3533            "only the non-suppressed INSERT should emit"
3534        );
3535        assert_eq!(
3536            events[0].get("id").and_then(crate::json::Value::as_u64),
3537            Some(2)
3538        );
3539    }
3540}