Skip to main content

reddb_server/runtime/
impl_dml.rs

1//! DML execution: INSERT, UPDATE, DELETE via SQL AST
2//!
3//! Implements `execute_insert`, `execute_update`, and `execute_delete` on
4//! `RedDBRuntime`.  Each method translates the parsed AST into entity-level
5//! operations through the existing `RuntimeEntityPort` trait so that all
6//! cross-cutting concerns (WAL, indexing, replication) are automatically
7//! applied.
8
9use crate::application::entity::{
10    AppliedEntityMutation, CreateDocumentInput, CreateEdgeInput, CreateKvInput, CreateNodeInput,
11    CreateRowInput, CreateRowsBatchInput, CreateVectorInput, DeleteEntityInput,
12    PatchEntityOperation, PatchEntityOperationType, RowUpdateColumnRule, RowUpdateContractPlan,
13};
14use crate::application::ports::{
15    build_row_update_contract_plan, normalize_row_update_assignment_with_plan,
16    normalize_row_update_value_for_rule, RuntimeEntityPort,
17};
18use crate::application::ttl_payload::has_internal_ttl_metadata;
19use crate::presentation::entity_json::storage_value_to_json;
20use crate::storage::query::ast::{Expr, ReturningItem};
21use crate::storage::query::sql_lowering::{
22    effective_delete_filter, effective_insert_rows, effective_update_filter, fold_expr_to_value,
23};
24use crate::storage::query::unified::{sys_key_red_entity_id, UnifiedRecord, UnifiedResult};
25use crate::storage::unified::MetadataValue;
26use crate::storage::Metadata;
27use std::collections::HashMap;
28use std::sync::Arc;
29
30use super::*;
31
32const UPDATE_APPLY_CHUNK_SIZE: usize = 2048;
33const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
34const TREE_METADATA_PREFIX: &str = "red.tree.";
35
36#[derive(Clone)]
37struct CompiledUpdateAssignment {
38    column: String,
39    expr: Expr,
40    metadata_key: Option<&'static str>,
41    row_rule: Option<RowUpdateColumnRule>,
42}
43
44struct CompiledUpdatePlan {
45    static_field_assignments: Vec<(String, Value)>,
46    static_metadata_assignments: Vec<(String, MetadataValue)>,
47    dynamic_assignments: Vec<CompiledUpdateAssignment>,
48    row_contract_plan: Option<RowUpdateContractPlan>,
49    row_modified_columns: Vec<String>,
50    row_touches_unique_columns: bool,
51}
52
53#[derive(Default)]
54struct MaterializedUpdateAssignments {
55    dynamic_field_assignments: Vec<(String, Value)>,
56    dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
57}
58
59impl RedDBRuntime {
60    /// Phase 2.5.4: inject `CURRENT_TENANT()` into an INSERT when the
61    /// target table is tenant-scoped and the user's column list does
62    /// not already name the tenant column.
63    ///
64    /// Returns:
65    /// * `Ok(None)` — no injection needed (non-tenant table, or user
66    ///   supplied the column explicitly). Caller uses the original
67    ///   query unchanged.
68    /// * `Ok(Some(augmented))` — a cloned query with the tenant column
69    ///   + literal value appended to every row.
70    /// * `Err(..)` — table is tenant-scoped but no tenant is bound to
71    ///   the current session. Fails loudly so callers don't produce
72    ///   rows that RLS would then hide on read.
73    fn maybe_inject_tenant_column(&self, query: &InsertQuery) -> RedDBResult<Option<InsertQuery>> {
74        let Some(tenant_col) = self.tenant_column(&query.table) else {
75            return Ok(None);
76        };
77        // User already named the column (literal match) — trust them.
78        if query
79            .columns
80            .iter()
81            .any(|c| c.eq_ignore_ascii_case(&tenant_col))
82        {
83            return Ok(None);
84        }
85
86        // Phase 2 PG parity: dotted-path tenancy. When `tenant_col` is a
87        // nested key like `headers.tenant` we operate on the root
88        // column (`headers`) and set / add the nested path inside its
89        // JSON value. If the user named the root column we mutate in
90        // place; otherwise we create a fresh JSON column for every row.
91        if let Some(dot_pos) = tenant_col.find('.') {
92            let (root, tail) = tenant_col.split_at(dot_pos);
93            let tail = &tail[1..]; // drop leading '.'
94            return self.inject_dotted_tenant(query, root, tail);
95        }
96
97        let Some(tenant_id) = crate::runtime::impl_core::current_tenant() else {
98            return Err(RedDBError::Query(format!(
99                "INSERT into tenant-scoped table '{}' requires an active tenant — \
100                 run SET TENANT '<id>' first or name column '{}' explicitly",
101                query.table, tenant_col
102            )));
103        };
104
105        let mut augmented = query.clone();
106        augmented.columns.push(tenant_col);
107        let lit = Value::text(tenant_id.clone());
108        for row in augmented.values.iter_mut() {
109            row.push(lit.clone());
110        }
111        for row in augmented.value_exprs.iter_mut() {
112            row.push(crate::storage::query::ast::Expr::Literal {
113                value: lit.clone(),
114                span: crate::storage::query::ast::Span::synthetic(),
115            });
116        }
117        Ok(Some(augmented))
118    }
119
120    /// Dotted-path auto-fill — set `root.tail` to `CURRENT_TENANT()` on
121    /// every row. Mirrors `maybe_inject_tenant_column` but mutates
122    /// nested JSON instead of appending a flat column.
123    ///
124    /// Cases:
125    /// * Root column already in the INSERT list → mutate per-row JSON
126    ///   (parse, set path, re-serialize).
127    /// * Root column absent → create a fresh `{tail: tenant}` JSON
128    ///   object and append the root column to the INSERT.
129    fn inject_dotted_tenant(
130        &self,
131        query: &InsertQuery,
132        root: &str,
133        tail: &str,
134    ) -> RedDBResult<Option<InsertQuery>> {
135        let active_tenant = crate::runtime::impl_core::current_tenant();
136        let mut augmented = query.clone();
137        let root_idx = augmented
138            .columns
139            .iter()
140            .position(|c| c.eq_ignore_ascii_case(root));
141
142        if let Some(idx) = root_idx {
143            // User supplied the root column. Per-row: if the dotted
144            // tail is already present we trust the user (admin / bulk
145            // loader scenario); otherwise fill from the active
146            // tenant. An unbound tenant is only an error when some
147            // row actually needs filling.
148            for row in augmented.values.iter_mut() {
149                let Some(slot) = row.get_mut(idx) else {
150                    continue;
151                };
152                if dotted_tail_already_set(slot, tail) {
153                    continue;
154                }
155                let Some(tenant_id) = &active_tenant else {
156                    return Err(RedDBError::Query(format!(
157                        "INSERT into tenant-scoped table '{}' requires an active tenant — \
158                         run SET TENANT '<id>' first or set '{}.{}' explicitly in each row",
159                        query.table, root, tail
160                    )));
161                };
162                *slot = merge_dotted_tenant(slot.clone(), tail, tenant_id)?;
163            }
164            // Expression row is kept in sync by re-wrapping the
165            // mutated literal; the canonical path will re-evaluate
166            // against the same JSON shape.
167            for (row_idx, row) in augmented.value_exprs.iter_mut().enumerate() {
168                if let Some(slot) = row.get_mut(idx) {
169                    let new_value = augmented
170                        .values
171                        .get(row_idx)
172                        .and_then(|v| v.get(idx))
173                        .cloned()
174                        .unwrap_or(Value::Null);
175                    *slot = crate::storage::query::ast::Expr::Literal {
176                        value: new_value,
177                        span: crate::storage::query::ast::Span::synthetic(),
178                    };
179                }
180            }
181        } else {
182            // No root column in the INSERT list — auto-fill needs a
183            // bound tenant to synthesise one. Error loud so we never
184            // create a tenant-less row that RLS would then hide.
185            let Some(tenant_id) = &active_tenant else {
186                return Err(RedDBError::Query(format!(
187                    "INSERT into tenant-scoped table '{}' requires an active tenant — \
188                     run SET TENANT '<id>' first or name path '{}.{}' explicitly",
189                    query.table, root, tail
190                )));
191            };
192            // Create a fresh JSON column with only the tenant path set.
193            augmented.columns.push(root.to_string());
194            let fresh = merge_dotted_tenant(Value::Null, tail, tenant_id)?;
195            for row in augmented.values.iter_mut() {
196                row.push(fresh.clone());
197            }
198            for row in augmented.value_exprs.iter_mut() {
199                row.push(crate::storage::query::ast::Expr::Literal {
200                    value: fresh.clone(),
201                    span: crate::storage::query::ast::Span::synthetic(),
202                });
203            }
204        }
205
206        Ok(Some(augmented))
207    }
208
209    /// Returns `(affected_count, lsns)`. For the txn (xmax-stamp) path,
210    /// `lsns` is empty — events fire at commit time (not yet implemented).
211    fn delete_entities_batch(
212        &self,
213        collection: &str,
214        ids: &[EntityId],
215    ) -> RedDBResult<(u64, Vec<u64>)> {
216        if ids.is_empty() {
217            return Ok((0, vec![]));
218        }
219
220        // Phase 2.3.2b: when the DELETE is running inside a BEGIN-wrapped
221        // transaction, stamp xmax instead of physically removing the
222        // tuple so concurrent snapshots keep seeing the row until this
223        // txn commits. COMMIT drains `pending_tombstones` and does the
224        // real delete_batch; ROLLBACK resets xmax back to 0.
225        if let Some(xid) = self.current_xid() {
226            let store = self.db().store();
227            let Some(manager) = store.get_collection(collection) else {
228                return Ok((0, vec![]));
229            };
230            let conn_id = crate::runtime::impl_core::current_connection_id();
231            let mut marked: u64 = 0;
232            for &id in ids {
233                let Some(mut entity) = manager.get(id) else {
234                    continue;
235                };
236                // Skip if this tuple was already tombstoned by a prior
237                // statement in the same txn — idempotent DELETE.
238                if entity.xmax != 0 {
239                    continue;
240                }
241                entity.set_xmax(xid);
242                if manager.update(entity).is_ok() {
243                    self.record_pending_tombstone(conn_id, collection, id, xid);
244                    marked += 1;
245                }
246            }
247            return Ok((marked, vec![]));
248        }
249
250        let store = self.db().store();
251        let deleted_ids = store
252            .delete_batch(collection, ids)
253            .map_err(|err| RedDBError::Internal(err.to_string()))?;
254        if deleted_ids.is_empty() {
255            return Ok((0, vec![]));
256        }
257
258        let mut lsns = Vec::with_capacity(deleted_ids.len());
259        for id in &deleted_ids {
260            store.context_index().remove_entity(*id);
261            let lsn = self.cdc_emit(
262                crate::replication::cdc::ChangeOperation::Delete,
263                collection,
264                id.raw(),
265                "entity",
266            );
267            lsns.push(lsn);
268        }
269
270        Ok((deleted_ids.len() as u64, lsns))
271    }
272
273    /// Flushes context-index updates and CDC for each applied mutation.
274    /// Returns one LSN per entity in the same order as `applied`.
275    fn flush_update_chunk(&self, applied: &[AppliedEntityMutation]) -> Vec<u64> {
276        if applied.is_empty() {
277            return Vec::new();
278        }
279
280        let store = self.db().store();
281        if applied.iter().any(|item| item.context_index_dirty) {
282            store.context_index().index_entities(
283                &applied[0].collection,
284                applied
285                    .iter()
286                    .filter(|item| item.context_index_dirty)
287                    .map(|item| &item.entity),
288            );
289        }
290
291        let mut lsns = Vec::with_capacity(applied.len());
292        for item in applied {
293            let lsn = self.cdc_emit_prebuilt(
294                crate::replication::cdc::ChangeOperation::Update,
295                &item.collection,
296                &item.entity,
297                "entity",
298                item.metadata.as_ref(),
299                false,
300            );
301            lsns.push(lsn);
302        }
303        lsns
304    }
305
306    fn persist_update_chunk(&self, applied: &[AppliedEntityMutation]) -> RedDBResult<()> {
307        self.persist_applied_entity_mutations(applied)
308    }
309
310    /// Execute INSERT INTO table [entity_type] (cols) VALUES (vals), ...
311    ///
312    /// Each row in `query.values` is zipped with `query.columns` to produce a
313    /// set of named fields, which is then dispatched based on entity_type.
314    pub fn execute_insert(
315        &self,
316        raw_query: &str,
317        query: &InsertQuery,
318    ) -> RedDBResult<RuntimeQueryResult> {
319        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
320        // CollectionContract gate (#49): single entry point for the
321        // operator's collection-level write rules. Today this is a
322        // no-op for INSERT (APPEND ONLY permits insert); routing
323        // through the gate now means future contract bits — versioned,
324        // vault-only writes — plug in once instead of per verb.
325        crate::runtime::collection_contract::CollectionContractGate::check(
326            self,
327            &query.table,
328            crate::runtime::collection_contract::MutationKind::Insert,
329        )?;
330        // Phase 2.5.4 table-scoped tenancy: if the target table is
331        // tenant-scoped and the user didn't name the tenant column,
332        // auto-inject it with the thread-local `CURRENT_TENANT()`
333        // value. When the column is named explicitly we trust the
334        // caller (useful for admin tooling that writes on behalf of
335        // specific tenants). An unbound tenant on an implicit-fill
336        // path errors up front rather than producing a row the RLS
337        // policy would silently hide.
338        let augmented_owned;
339        let query = match self.maybe_inject_tenant_column(query)? {
340            Some(new_q) => {
341                augmented_owned = new_q;
342                &augmented_owned
343            }
344            None => query,
345        };
346        self.check_insert_column_policy(query)?;
347
348        let mut inserted_count: u64 = 0;
349        let effective_rows =
350            effective_insert_rows(query).map_err(|msg| RedDBError::Query(msg.to_string()))?;
351
352        // Ensure the collection exists (auto-create on first insert).
353        let store = self.inner.db.store();
354        let _ = store.get_or_create_collection(&query.table);
355        let declared_model = self
356            .db()
357            .collection_contract_arc(&query.table)
358            .map(|contract| contract.declared_model);
359
360        let mut returning_snapshots: Option<Vec<Vec<(String, Value)>>> =
361            if query.returning.is_some() {
362                Some(Vec::with_capacity(effective_rows.len()))
363            } else {
364                None
365            };
366        let mut returning_result: Option<UnifiedResult> = None;
367
368        if matches!(query.entity_type, InsertEntityType::Row)
369            && !matches!(
370                declared_model,
371                Some(crate::catalog::CollectionModel::TimeSeries)
372            )
373        {
374            let mut rows = Vec::with_capacity(effective_rows.len());
375            for row_values in &effective_rows {
376                if row_values.len() != query.columns.len() {
377                    return Err(RedDBError::Query(format!(
378                        "INSERT column count ({}) does not match value count ({})",
379                        query.columns.len(),
380                        row_values.len()
381                    )));
382                }
383                let (fields, mut metadata) =
384                    split_insert_metadata(self, &query.columns, row_values)?;
385                merge_with_clauses(
386                    &mut metadata,
387                    query.ttl_ms,
388                    query.expires_at_ms,
389                    &query.with_metadata,
390                );
391                if let Some(snaps) = returning_snapshots.as_mut() {
392                    snaps.push(fields.clone());
393                }
394                rows.push(CreateRowInput {
395                    collection: query.table.clone(),
396                    fields,
397                    metadata,
398                    node_links: Vec::new(),
399                    vector_links: Vec::new(),
400                });
401            }
402            let outputs = self.create_rows_batch(CreateRowsBatchInput {
403                collection: query.table.clone(),
404                rows,
405                suppress_events: query.suppress_events,
406            })?;
407            inserted_count = outputs.len() as u64;
408
409            // Hypertable chunk routing: if this table was declared via
410            // CREATE HYPERTABLE, register each row's time-column value
411            // with the registry so chunk metadata (bounds, row counts,
412            // TTL eligibility) stays current. This is what lets
413            // HYPERTABLE_PRUNE_CHUNKS answer real questions + lets the
414            // retention daemon sweep expired chunks without scanning
415            // every row.
416            if let Some(spec) = self.inner.db.hypertables().get(&query.table) {
417                let time_col = &spec.time_column;
418                // Find the column's index in the INSERT column list.
419                if let Some(idx) = query.columns.iter().position(|c| c == time_col) {
420                    for row in &effective_rows {
421                        if let Some(Value::Integer(n) | Value::BigInt(n)) = row.get(idx) {
422                            if *n >= 0 {
423                                let _ = self.inner.db.hypertables().route(&query.table, *n as u64);
424                            }
425                        } else if let Some(Value::UnsignedInteger(n)) = row.get(idx) {
426                            let _ = self.inner.db.hypertables().route(&query.table, *n);
427                        }
428                    }
429                }
430            }
431
432            if let (Some(items), Some(snaps)) =
433                (query.returning.as_ref(), returning_snapshots.take())
434            {
435                returning_result = Some(build_returning_result(items, &snaps, Some(&outputs)));
436            }
437        } else {
438            if query.returning.is_some() {
439                return Err(RedDBError::Query(
440                    "RETURNING is not yet supported for this INSERT path (only plain table rows)"
441                        .to_string(),
442                ));
443            }
444            for row_values in &effective_rows {
445                if row_values.len() != query.columns.len() {
446                    return Err(RedDBError::Query(format!(
447                        "INSERT column count ({}) does not match value count ({})",
448                        query.columns.len(),
449                        row_values.len()
450                    )));
451                }
452
453                match query.entity_type {
454                    InsertEntityType::Row => {
455                        let (fields, mut metadata) =
456                            split_insert_metadata(self, &query.columns, row_values)?;
457                        merge_with_clauses(
458                            &mut metadata,
459                            query.ttl_ms,
460                            query.expires_at_ms,
461                            &query.with_metadata,
462                        );
463                        self.insert_timeseries_point(&query.table, fields, metadata)?;
464                    }
465                    InsertEntityType::Node => {
466                        let (node_values, mut metadata) =
467                            split_insert_metadata(self, &query.columns, row_values)?;
468                        merge_with_clauses(
469                            &mut metadata,
470                            query.ttl_ms,
471                            query.expires_at_ms,
472                            &query.with_metadata,
473                        );
474                        ensure_non_tree_reserved_metadata_entries(&metadata)?;
475                        let (columns, values) = pairwise_columns_values(&node_values);
476                        let label = find_column_value_string(&columns, &values, "label")?;
477                        let node_type =
478                            find_column_value_opt_string(&columns, &values, "node_type");
479                        let properties = extract_remaining_properties(
480                            &columns,
481                            &values,
482                            &["label", "node_type"],
483                        );
484                        let input = CreateNodeInput {
485                            collection: query.table.clone(),
486                            label,
487                            node_type,
488                            properties,
489                            metadata,
490                            embeddings: Vec::new(),
491                            table_links: Vec::new(),
492                            node_links: Vec::new(),
493                        };
494                        self.create_node(input)?;
495                    }
496                    InsertEntityType::Edge => {
497                        let (edge_values, mut metadata) =
498                            split_insert_metadata(self, &query.columns, row_values)?;
499                        merge_with_clauses(
500                            &mut metadata,
501                            query.ttl_ms,
502                            query.expires_at_ms,
503                            &query.with_metadata,
504                        );
505                        ensure_non_tree_reserved_metadata_entries(&metadata)?;
506                        let (columns, values) = pairwise_columns_values(&edge_values);
507                        let label = find_column_value_string(&columns, &values, "label")?;
508                        ensure_non_tree_structural_edge_label(&label)?;
509                        let from_id = find_column_value_u64(&columns, &values, "from")?;
510                        let to_id = find_column_value_u64(&columns, &values, "to")?;
511                        let weight = find_column_value_f32_opt(&columns, &values, "weight");
512                        let properties = extract_remaining_properties(
513                            &columns,
514                            &values,
515                            &["label", "from", "to", "weight"],
516                        );
517                        let input = CreateEdgeInput {
518                            collection: query.table.clone(),
519                            label,
520                            from: EntityId::new(from_id),
521                            to: EntityId::new(to_id),
522                            weight,
523                            properties,
524                            metadata,
525                        };
526                        self.create_edge(input)?;
527                    }
528                    InsertEntityType::Vector => {
529                        let (vector_values, mut metadata) =
530                            split_insert_metadata(self, &query.columns, row_values)?;
531                        merge_with_clauses(
532                            &mut metadata,
533                            query.ttl_ms,
534                            query.expires_at_ms,
535                            &query.with_metadata,
536                        );
537                        let (columns, values) = pairwise_columns_values(&vector_values);
538                        let dense = find_column_value_vec_f32(&columns, &values, "dense")?;
539                        let content = find_column_value_opt_string(&columns, &values, "content");
540                        let input = CreateVectorInput {
541                            collection: query.table.clone(),
542                            dense,
543                            content,
544                            metadata,
545                            link_row: None,
546                            link_node: None,
547                        };
548                        self.create_vector(input)?;
549                    }
550                    InsertEntityType::Document => {
551                        let (document_values, mut metadata) =
552                            split_insert_metadata(self, &query.columns, row_values)?;
553                        merge_with_clauses(
554                            &mut metadata,
555                            query.ttl_ms,
556                            query.expires_at_ms,
557                            &query.with_metadata,
558                        );
559                        let (columns, values) = pairwise_columns_values(&document_values);
560                        let body_str = find_column_value_string(&columns, &values, "body")?;
561                        let body: crate::json::Value = crate::json::from_str(&body_str)
562                            .map_err(|e| RedDBError::Query(format!("invalid JSON body: {e}")))?;
563                        let input = CreateDocumentInput {
564                            collection: query.table.clone(),
565                            body,
566                            metadata,
567                            node_links: Vec::new(),
568                            vector_links: Vec::new(),
569                        };
570                        self.create_document(input)?;
571                    }
572                    InsertEntityType::Kv => {
573                        let (kv_values, mut metadata) =
574                            split_insert_metadata(self, &query.columns, row_values)?;
575                        merge_with_clauses(
576                            &mut metadata,
577                            query.ttl_ms,
578                            query.expires_at_ms,
579                            &query.with_metadata,
580                        );
581                        let (columns, values) = pairwise_columns_values(&kv_values);
582                        let key = find_column_value_string(&columns, &values, "key")?;
583                        let value = find_column_value(&columns, &values, "value")?;
584                        let input = CreateKvInput {
585                            collection: query.table.clone(),
586                            key,
587                            value,
588                            metadata,
589                        };
590                        self.create_kv(input)?;
591                    }
592                }
593
594                inserted_count += 1;
595            }
596        }
597
598        // Auto-embed pipeline: batch-embed fields across all inserted rows via AiBatchClient.
599        if let Some(ref embed_config) = query.auto_embed {
600            let store = self.inner.db.store();
601            let provider = crate::ai::parse_provider(&embed_config.provider)?;
602            let api_key = crate::ai::resolve_api_key_from_runtime(&provider, None, self)?;
603            let model = embed_config.model.clone().unwrap_or_else(|| {
604                std::env::var("REDDB_OPENAI_EMBEDDING_MODEL")
605                    .ok()
606                    .unwrap_or_else(|| crate::ai::DEFAULT_OPENAI_EMBEDDING_MODEL.to_string())
607            });
608
609            // Collect the just-inserted rows (most-recently appended, reversed back to insert order).
610            let manager = store
611                .get_collection(&query.table)
612                .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
613            let entities = manager.query_all(|_| true);
614            let recent: Vec<_> = entities
615                .into_iter()
616                .rev()
617                .take(effective_rows.len())
618                .collect();
619
620            // Collector phase: (entity_index, combined_text) for rows that have non-empty fields.
621            let entity_combos: Vec<(usize, String)> = recent
622                .iter()
623                .enumerate()
624                .filter_map(|(i, entity)| {
625                    if let EntityData::Row(ref row) = entity.data {
626                        if let Some(ref named) = row.named {
627                            let texts: Vec<String> = embed_config
628                                .fields
629                                .iter()
630                                .filter_map(|field| match named.get(field) {
631                                    Some(Value::Text(t)) if !t.is_empty() => Some(t.to_string()),
632                                    _ => None,
633                                })
634                                .collect();
635                            if !texts.is_empty() {
636                                return Some((i, texts.join(" ")));
637                            }
638                        }
639                    }
640                    None
641                })
642                .collect();
643
644            if !entity_combos.is_empty() {
645                // Batch phase: single provider round-trip for all rows.
646                let batch_texts: Vec<String> =
647                    entity_combos.iter().map(|(_, t)| t.clone()).collect();
648
649                let batch_client =
650                    crate::runtime::ai::batch_client::AiBatchClient::from_runtime(self);
651
652                let embeddings = match tokio::runtime::Handle::try_current() {
653                    Ok(handle) => tokio::task::block_in_place(|| {
654                        handle.block_on(batch_client.embed_batch(
655                            &provider,
656                            &model,
657                            &api_key,
658                            batch_texts,
659                        ))
660                    }),
661                    Err(_) => {
662                        return Err(RedDBError::Query(
663                            "AUTO EMBED requires a Tokio runtime context".to_string(),
664                        ));
665                    }
666                }
667                .map_err(|e| RedDBError::Query(e.to_string()))?;
668
669                // Distribute phase: persist one vector per non-empty embedding.
670                for ((_, combined), dense) in entity_combos.iter().zip(embeddings) {
671                    if dense.is_empty() {
672                        continue;
673                    }
674                    self.create_vector(CreateVectorInput {
675                        collection: query.table.clone(),
676                        dense,
677                        content: Some(combined.clone()),
678                        metadata: Vec::new(),
679                        link_row: None,
680                        link_node: None,
681                    })?;
682                }
683            }
684        }
685
686        if inserted_count > 0 {
687            self.note_table_write(&query.table);
688        }
689
690        let mut result = RuntimeQueryResult::dml_result(
691            raw_query.to_string(),
692            inserted_count,
693            "insert",
694            "runtime-dml",
695        );
696        if let Some(returning) = returning_result {
697            result.result = returning;
698        }
699        Ok(result)
700    }
701
702    fn check_insert_column_policy(&self, query: &InsertQuery) -> RedDBResult<()> {
703        let Some(auth_store) = self.inner.auth_store.read().clone() else {
704            return Ok(());
705        };
706        if !auth_store.iam_authorization_enabled() {
707            return Ok(());
708        }
709        let Some((username, role)) = crate::runtime::impl_core::current_auth_identity() else {
710            return Ok(());
711        };
712
713        let tenant = crate::runtime::impl_core::current_tenant();
714        let principal = crate::auth::UserId::from_parts(tenant.as_deref(), &username);
715        let request = crate::auth::ColumnAccessRequest {
716            action: "insert".to_string(),
717            schema: None,
718            table: query.table.clone(),
719            columns: query.columns.clone(),
720        };
721        let ctx = crate::auth::policies::EvalContext {
722            principal_tenant: tenant.clone(),
723            current_tenant: tenant,
724            peer_ip: None,
725            mfa_present: false,
726            now_ms: crate::auth::now_ms(),
727            principal_is_admin_role: role == crate::auth::Role::Admin,
728        };
729
730        let outcome = auth_store.check_column_projection_authz(&principal, &request, &ctx);
731        let table_allowed = matches!(
732            outcome.table_decision,
733            crate::auth::policies::Decision::Allow { .. }
734                | crate::auth::policies::Decision::AdminBypass
735        );
736        if !table_allowed {
737            return Err(RedDBError::Query(format!(
738                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
739                outcome.table_resource.kind, outcome.table_resource.name
740            )));
741        }
742        if let Some(denied) = outcome.first_denied_column() {
743            return Err(RedDBError::Query(format!(
744                "principal=`{username}` action=`insert` resource=`{}:{}` denied by IAM policy",
745                denied.resource.kind, denied.resource.name
746            )));
747        }
748
749        Ok(())
750    }
751
752    pub(crate) fn insert_timeseries_point(
753        &self,
754        collection: &str,
755        fields: Vec<(String, Value)>,
756        mut metadata: Vec<(String, MetadataValue)>,
757    ) -> RedDBResult<EntityId> {
758        apply_collection_default_ttl_metadata(self, collection, &mut metadata);
759
760        let (columns, values) = pairwise_columns_values(&fields);
761        validate_timeseries_insert_columns(&columns)?;
762
763        let metric = find_column_value_string(&columns, &values, "metric")?;
764        let value = find_column_value_f64(&columns, &values, "value")?;
765        let timestamp_ns =
766            find_timeseries_timestamp_ns(&columns, &values)?.unwrap_or_else(current_unix_ns);
767        let tags = find_timeseries_tags(&columns, &values)?;
768
769        let mut entity = UnifiedEntity::new(
770            EntityId::new(0),
771            EntityKind::TimeSeriesPoint(Box::new(crate::storage::TimeSeriesPointKind {
772                series: collection.to_string(),
773                metric: metric.clone(),
774            })),
775            EntityData::TimeSeries(crate::storage::TimeSeriesData {
776                metric,
777                timestamp_ns,
778                value,
779                tags,
780            }),
781        );
782        // MVCC #30: stamp xmin with the active tx xid (inside a tx)
783        // or an autocommit xid (allocated and committed up-front so
784        // future snapshots see the row as soon as it lands).
785        let writer_xid = match self.current_xid() {
786            Some(xid) => xid,
787            None => {
788                let mgr = self.snapshot_manager();
789                let xid = mgr.begin();
790                mgr.commit(xid);
791                xid
792            }
793        };
794        entity.set_xmin(writer_xid);
795
796        let store = self.inner.db.store();
797        let id = store
798            .insert_auto(collection, entity)
799            .map_err(|err| RedDBError::Internal(err.to_string()))?;
800
801        if !metadata.is_empty() {
802            let _ = store.set_metadata(
803                collection,
804                id,
805                Metadata::with_fields(metadata.into_iter().collect()),
806            );
807        }
808
809        self.cdc_emit(
810            crate::replication::cdc::ChangeOperation::Insert,
811            collection,
812            id.raw(),
813            "timeseries",
814        );
815
816        Ok(id)
817    }
818
819    /// Execute UPDATE table SET col=val, ... WHERE filter
820    ///
821    /// Scans the target collection, evaluates the WHERE filter against each
822    /// record, and patches every matching entity.
823    pub fn execute_update(
824        &self,
825        raw_query: &str,
826        query: &UpdateQuery,
827    ) -> RedDBResult<RuntimeQueryResult> {
828        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
829        // CollectionContract gate (#50): runs the APPEND ONLY guard
830        // (and any future contract bits) before RLS / RETURNING work
831        // so the operator's immutability declaration is honoured
832        // uniformly and the error message points at the DDL rather
833        // than at a downstream symptom.
834        crate::runtime::collection_contract::CollectionContractGate::check(
835            self,
836            &query.table,
837            crate::runtime::collection_contract::MutationKind::Update,
838        )?;
839
840        // Apply RLS augmentation first so every downstream path — plain
841        // UPDATE, UPDATE...RETURNING, the inner scan — observes the
842        // same policy-filtered target set. This prevents RETURNING
843        // from ever exposing rows the UPDATE policy would have
844        // denied.
845        let rls_gated = crate::runtime::impl_core::rls_is_enabled(self, &query.table);
846        let augmented_query: UpdateQuery;
847        let effective_query: &UpdateQuery = if rls_gated {
848            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
849                self,
850                &query.table,
851                crate::storage::query::ast::PolicyAction::Update,
852            );
853            let Some(policy) = rls_filter else {
854                // No admitting policy: zero rows affected, empty
855                // RETURNING (never leak rows the caller can't touch).
856                let mut response = RuntimeQueryResult::dml_result(
857                    raw_query.to_string(),
858                    0,
859                    "update",
860                    "runtime-dml-rls",
861                );
862                if let Some(items) = query.returning.clone() {
863                    response.result = build_returning_result(&items, &[], None);
864                }
865                return Ok(response);
866            };
867            let mut augmented = query.clone();
868            augmented.filter = Some(match augmented.filter.take() {
869                Some(existing) => {
870                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
871                }
872                None => policy,
873            });
874            augmented_query = augmented;
875            &augmented_query
876        } else {
877            query
878        };
879
880        // RETURNING wraps the inner executor and uses the touched-id
881        // list the inner reports so the post-image reflects exactly
882        // the rows the UPDATE actually mutated (not whatever a
883        // separate SELECT might have observed).
884        if let Some(items) = effective_query.returning.clone() {
885            let mut inner_query = effective_query.clone();
886            inner_query.returning = None;
887            let (mut response, touched_ids) =
888                self.execute_update_inner_tracked(raw_query, &inner_query)?;
889
890            let snapshots = super::dml_target_scan::DmlTargetScan::new(
891                self,
892                &effective_query.table,
893                None,
894                None,
895            )
896            .row_snapshots(&touched_ids);
897
898            response.result = build_returning_result(&items, &snapshots, None);
899            response.engine = "runtime-dml-returning";
900            return Ok(response);
901        }
902
903        self.execute_update_inner(raw_query, effective_query)
904    }
905
906    /// Back-compat shim: the older entry point ignored touched ids.
907    fn execute_update_inner(
908        &self,
909        raw_query: &str,
910        query: &UpdateQuery,
911    ) -> RedDBResult<RuntimeQueryResult> {
912        self.execute_update_inner_tracked(raw_query, query)
913            .map(|(res, _)| res)
914    }
915
916    fn execute_update_inner_tracked(
917        &self,
918        raw_query: &str,
919        query: &UpdateQuery,
920    ) -> RedDBResult<(RuntimeQueryResult, Vec<EntityId>)> {
921        let store = self.inner.db.store();
922        let effective_filter = effective_update_filter(query);
923        let compiled_plan = self.compile_update_plan(query)?;
924        let mut touched_ids: Vec<EntityId> = Vec::new();
925        let limit_cap = query.limit.map(|l| l as usize);
926        let manager = store
927            .get_collection(&query.table)
928            .ok_or_else(|| RedDBError::NotFound(query.table.clone()))?;
929        let ids_to_update = super::dml_target_scan::DmlTargetScan::new(
930            self,
931            &query.table,
932            effective_filter.as_ref(),
933            limit_cap,
934        )
935        .find_target_ids()?;
936
937        let mut affected: u64 = 0;
938        for chunk in ids_to_update.chunks(UPDATE_APPLY_CHUNK_SIZE) {
939            let mut applied_chunk = Vec::with_capacity(chunk.len());
940            for entity in manager.get_many(chunk).into_iter().flatten() {
941                let assignments =
942                    self.materialize_update_assignments_for_entity(query, &entity, &compiled_plan)?;
943                let applied = self.apply_materialized_update_for_entity(
944                    query.table.clone(),
945                    entity,
946                    &compiled_plan,
947                    assignments,
948                )?;
949                touched_ids.push(applied.id);
950                applied_chunk.push(applied);
951            }
952            self.persist_update_chunk(&applied_chunk)?;
953            affected += applied_chunk.len() as u64;
954            let lsns = self.flush_update_chunk(&applied_chunk);
955            if !query.suppress_events {
956                self.emit_update_events_for_collection(&query.table, &applied_chunk, &lsns)?;
957            }
958        }
959
960        if affected > 0 {
961            self.note_table_write(&query.table);
962        }
963
964        Ok((
965            RuntimeQueryResult::dml_result(
966                raw_query.to_string(),
967                affected,
968                "update",
969                "runtime-dml",
970            ),
971            touched_ids,
972        ))
973    }
974
975    fn compile_update_plan(&self, query: &UpdateQuery) -> RedDBResult<CompiledUpdatePlan> {
976        let mut static_field_assignments = Vec::new();
977        let mut static_metadata_assignments = Vec::new();
978        let mut dynamic_assignments = Vec::new();
979        let row_contract_plan = build_row_update_contract_plan(&self.db(), &query.table)?;
980        let mut row_modified_columns = Vec::new();
981
982        for (column, expr) in &query.assignment_exprs {
983            let metadata_key = resolve_sql_ttl_metadata_key(column);
984            if let Ok(value) = fold_expr_to_value(expr.clone()) {
985                if let Some(metadata_key) = metadata_key {
986                    let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
987                    let (canonical_key, canonical_value) =
988                        canonicalize_sql_ttl_metadata(metadata_key, raw_value);
989                    static_metadata_assignments.push((canonical_key.to_string(), canonical_value));
990                } else {
991                    let value = self.resolve_crypto_sentinel(value)?;
992                    static_field_assignments.push((
993                        column.clone(),
994                        normalize_row_update_assignment_with_plan(
995                            &query.table,
996                            column,
997                            value,
998                            row_contract_plan.as_ref(),
999                        )?,
1000                    ));
1001                    row_modified_columns.push(column.clone());
1002                }
1003                continue;
1004            }
1005
1006            dynamic_assignments.push(CompiledUpdateAssignment {
1007                column: column.clone(),
1008                expr: expr.clone(),
1009                metadata_key,
1010                row_rule: if metadata_key.is_none() {
1011                    if let Some(plan) = row_contract_plan.as_ref() {
1012                        if plan.timestamps_enabled
1013                            && (column == "created_at" || column == "updated_at")
1014                        {
1015                            return Err(RedDBError::Query(format!(
1016                                "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1017                                query.table, column
1018                            )));
1019                        }
1020                        if let Some(rule) = plan.declared_rules.get(column) {
1021                            Some(rule.clone())
1022                        } else if plan.strict_schema {
1023                            return Err(RedDBError::Query(format!(
1024                                "collection '{}' is strict and does not allow undeclared fields: {}",
1025                                query.table, column
1026                            )));
1027                        } else {
1028                            None
1029                        }
1030                    } else {
1031                        None
1032                    }
1033                } else {
1034                    None
1035                },
1036            });
1037            if metadata_key.is_none() {
1038                row_modified_columns.push(column.clone());
1039            }
1040        }
1041
1042        let row_modified_columns = dedupe_update_columns(row_modified_columns);
1043        let row_touches_unique_columns = row_contract_plan.as_ref().is_some_and(|plan| {
1044            row_modified_columns.iter().any(|column| {
1045                plan.unique_columns
1046                    .keys()
1047                    .any(|unique| unique.eq_ignore_ascii_case(column))
1048            })
1049        });
1050
1051        if let Some(ttl_ms) = query.ttl_ms {
1052            static_metadata_assignments
1053                .push(("_ttl_ms".to_string(), metadata_u64_to_value(ttl_ms)));
1054        }
1055        if let Some(expires_at_ms) = query.expires_at_ms {
1056            static_metadata_assignments.push((
1057                "_expires_at".to_string(),
1058                metadata_u64_to_value(expires_at_ms),
1059            ));
1060        }
1061        for (key, val) in &query.with_metadata {
1062            static_metadata_assignments.push((key.clone(), storage_value_to_metadata_value(val)));
1063        }
1064
1065        Ok(CompiledUpdatePlan {
1066            static_field_assignments,
1067            static_metadata_assignments,
1068            dynamic_assignments,
1069            row_contract_plan,
1070            row_modified_columns,
1071            row_touches_unique_columns,
1072        })
1073    }
1074
1075    fn materialize_update_assignments_for_entity(
1076        &self,
1077        query: &UpdateQuery,
1078        entity: &UnifiedEntity,
1079        compiled_plan: &CompiledUpdatePlan,
1080    ) -> RedDBResult<MaterializedUpdateAssignments> {
1081        let mut assignments = MaterializedUpdateAssignments::default();
1082        let mut record: Option<UnifiedRecord> = None;
1083
1084        for assignment in &compiled_plan.dynamic_assignments {
1085            if record.is_none() {
1086                record = runtime_any_record_from_entity_ref(entity);
1087            }
1088            let Some(record) = record.as_ref() else {
1089                return Err(RedDBError::Query(format!(
1090                    "UPDATE could not materialize runtime record for entity {} in '{}'",
1091                    entity.id.raw(),
1092                    query.table
1093                )));
1094            };
1095            let value = super::expr_eval::evaluate_runtime_expr_with_db(
1096                Some(self.inner.db.as_ref()),
1097                &assignment.expr,
1098                record,
1099                Some(query.table.as_str()),
1100                Some(query.table.as_str()),
1101            )
1102            .ok_or_else(|| {
1103                RedDBError::Query(format!(
1104                    "failed to evaluate UPDATE expression for column '{}'",
1105                    assignment.column
1106                ))
1107            })?;
1108
1109            if let Some(metadata_key) = assignment.metadata_key {
1110                let raw_value = sql_literal_to_metadata_value(metadata_key, &value)?;
1111                let (canonical_key, canonical_value) =
1112                    canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1113                assignments
1114                    .dynamic_metadata_assignments
1115                    .push((canonical_key.to_string(), canonical_value));
1116            } else {
1117                assignments.dynamic_field_assignments.push((
1118                    assignment.column.clone(),
1119                    normalize_row_update_value_for_rule(
1120                        &query.table,
1121                        self.resolve_crypto_sentinel(value)?,
1122                        assignment.row_rule.as_ref(),
1123                    )?,
1124                ));
1125            }
1126        }
1127
1128        Ok(assignments)
1129    }
1130
1131    fn apply_materialized_update_for_entity(
1132        &self,
1133        collection: String,
1134        entity: UnifiedEntity,
1135        compiled_plan: &CompiledUpdatePlan,
1136        assignments: MaterializedUpdateAssignments,
1137    ) -> RedDBResult<AppliedEntityMutation> {
1138        if matches!(entity.data, EntityData::Row(_)) {
1139            return self.apply_loaded_sql_update_row_core(
1140                collection,
1141                entity,
1142                &compiled_plan.static_field_assignments,
1143                assignments.dynamic_field_assignments,
1144                &compiled_plan.static_metadata_assignments,
1145                assignments.dynamic_metadata_assignments,
1146                compiled_plan.row_contract_plan.as_ref(),
1147                &compiled_plan.row_modified_columns,
1148                compiled_plan.row_touches_unique_columns,
1149            );
1150        }
1151
1152        self.apply_loaded_patch_entity_core(
1153            collection,
1154            entity,
1155            crate::json::Value::Null,
1156            build_patch_operations_from_materialized_assignments(compiled_plan, assignments),
1157        )
1158    }
1159
1160    /// Execute DELETE FROM table WHERE filter
1161    pub fn execute_delete(
1162        &self,
1163        raw_query: &str,
1164        query: &DeleteQuery,
1165    ) -> RedDBResult<RuntimeQueryResult> {
1166        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
1167        // CollectionContract gate (#50) — see execute_update for
1168        // rationale. The gate handles APPEND ONLY rejection and is
1169        // the single point where future contract bits land.
1170        crate::runtime::collection_contract::CollectionContractGate::check(
1171            self,
1172            &query.table,
1173            crate::runtime::collection_contract::MutationKind::Delete,
1174        )?;
1175
1176        // RETURNING on DELETE: capture the pre-image via an internal
1177        // SELECT that reuses the same WHERE, then run the delete with
1178        // the RETURNING clause stripped, then project the captured
1179        // rows through the requested items. The extra SELECT is a
1180        // pragmatic MVP — a future pass can fuse the scan with the
1181        // delete to avoid the second pass over the heap.
1182        if let Some(items) = query.returning.clone() {
1183            let select_sql = delete_to_select_sql(raw_query).ok_or_else(|| {
1184                RedDBError::Query(
1185                    "DELETE ... RETURNING: cannot rewrite query for pre-image scan".to_string(),
1186                )
1187            })?;
1188            let captured = self.execute_query(&select_sql)?;
1189
1190            let mut inner_query = query.clone();
1191            inner_query.returning = None;
1192            let _ = self.execute_delete(raw_query, &inner_query)?;
1193
1194            let snapshots: Vec<Vec<(String, Value)>> = captured
1195                .result
1196                .records
1197                .iter()
1198                .map(|rec| {
1199                    rec.iter_fields()
1200                        .map(|(k, v)| (k.as_ref().to_string(), v.clone()))
1201                        .collect()
1202                })
1203                .collect();
1204            let affected = snapshots.len() as u64;
1205            let result = build_returning_result(&items, &snapshots, None);
1206
1207            let mut response = RuntimeQueryResult::dml_result(
1208                raw_query.to_string(),
1209                affected,
1210                "delete",
1211                "runtime-dml-returning",
1212            );
1213            response.result = result;
1214            return Ok(response);
1215        }
1216        // Row-Level Security enforcement (Phase 2.5.2 PG parity).
1217        //
1218        // When the table has RLS enabled, gate the DELETE by the
1219        // per-role policy set: mutations only touch rows that *every*
1220        // matching `FOR DELETE` policy would accept. No policies =>
1221        // zero rows affected (PG restrictive-default).
1222        if crate::runtime::impl_core::rls_is_enabled(self, &query.table) {
1223            let rls_filter = crate::runtime::impl_core::rls_policy_filter(
1224                self,
1225                &query.table,
1226                crate::storage::query::ast::PolicyAction::Delete,
1227            );
1228            let Some(policy) = rls_filter else {
1229                return Ok(RuntimeQueryResult::dml_result(
1230                    raw_query.to_string(),
1231                    0,
1232                    "delete",
1233                    "runtime-dml-rls",
1234                ));
1235            };
1236            // Fold the policy predicate into the user's WHERE before
1237            // dispatching — the remainder of this function reads the
1238            // filter from `query` via `effective_delete_filter`, which
1239            // respects the updated value.
1240            let mut augmented = query.clone();
1241            augmented.filter = Some(match augmented.filter.take() {
1242                Some(existing) => {
1243                    crate::storage::query::ast::Filter::And(Box::new(existing), Box::new(policy))
1244                }
1245                None => policy,
1246            });
1247            return self.execute_delete_inner(raw_query, &augmented);
1248        }
1249        self.execute_delete_inner(raw_query, query)
1250    }
1251
1252    fn execute_delete_inner(
1253        &self,
1254        raw_query: &str,
1255        query: &DeleteQuery,
1256    ) -> RedDBResult<RuntimeQueryResult> {
1257        let effective_filter = effective_delete_filter(query);
1258
1259        // Find the rows that match the WHERE clause. The "find target
1260        // rows" loop lives in DmlTargetScan so UPDATE (#52) can reuse
1261        // the same scan strategy.
1262        let scan = super::dml_target_scan::DmlTargetScan::new(
1263            self,
1264            &query.table,
1265            effective_filter.as_ref(),
1266            None,
1267        );
1268        let ids_to_delete = scan.find_target_ids()?;
1269
1270        // For event-enabled collections, snapshot the pre-delete state
1271        // before rows are physically removed.
1272        let needs_delete_events =
1273            !query.suppress_events && self.collection_has_delete_subscriptions(&query.table);
1274        let mut pre_images: HashMap<u64, crate::json::Value> = if needs_delete_events {
1275            scan.row_json_pre_images(&ids_to_delete)
1276        } else {
1277            HashMap::new()
1278        };
1279
1280        let mut affected: u64 = 0;
1281        for chunk in ids_to_delete.chunks(UPDATE_APPLY_CHUNK_SIZE) {
1282            let (count, lsns) = self.delete_entities_batch(&query.table, chunk)?;
1283            affected += count;
1284            if needs_delete_events && !lsns.is_empty() {
1285                // lsns.len() == actually-deleted entities; align with chunk ids.
1286                // `delete_batch` may skip missing entities, so we correlate by
1287                // the number returned (they're emitted in chunk order).
1288                let deleted_chunk = &chunk[..lsns.len().min(chunk.len())];
1289                self.emit_delete_events_for_collection(
1290                    &query.table,
1291                    deleted_chunk,
1292                    &lsns,
1293                    &pre_images,
1294                )?;
1295            }
1296        }
1297        pre_images.clear();
1298
1299        if affected > 0 {
1300            self.note_table_write(&query.table);
1301        }
1302
1303        Ok(RuntimeQueryResult::dml_result(
1304            raw_query.to_string(),
1305            affected,
1306            "delete",
1307            "runtime-dml",
1308        ))
1309    }
1310}
1311
1312fn build_patch_operations_from_materialized_assignments(
1313    compiled_plan: &CompiledUpdatePlan,
1314    assignments: MaterializedUpdateAssignments,
1315) -> Vec<PatchEntityOperation> {
1316    let mut operations = Vec::with_capacity(
1317        compiled_plan.static_field_assignments.len()
1318            + compiled_plan.static_metadata_assignments.len()
1319            + assignments.dynamic_field_assignments.len()
1320            + assignments.dynamic_metadata_assignments.len(),
1321    );
1322
1323    for (column, value) in &compiled_plan.static_field_assignments {
1324        operations.push(PatchEntityOperation {
1325            op: PatchEntityOperationType::Set,
1326            path: vec!["fields".to_string(), column.clone()],
1327            value: Some(storage_value_to_json(value)),
1328        });
1329    }
1330
1331    for (column, value) in assignments.dynamic_field_assignments {
1332        operations.push(PatchEntityOperation {
1333            op: PatchEntityOperationType::Set,
1334            path: vec!["fields".to_string(), column],
1335            value: Some(storage_value_to_json(&value)),
1336        });
1337    }
1338
1339    for (key, value) in &compiled_plan.static_metadata_assignments {
1340        operations.push(PatchEntityOperation {
1341            op: PatchEntityOperationType::Set,
1342            path: vec!["metadata".to_string(), key.clone()],
1343            value: Some(metadata_value_to_json(value)),
1344        });
1345    }
1346
1347    for (key, value) in assignments.dynamic_metadata_assignments {
1348        operations.push(PatchEntityOperation {
1349            op: PatchEntityOperationType::Set,
1350            path: vec!["metadata".to_string(), key],
1351            value: Some(metadata_value_to_json(&value)),
1352        });
1353    }
1354
1355    operations
1356}
1357
1358/// Rewrite `DELETE FROM <table> [WHERE …] [RETURNING …]` as
1359/// `SELECT * FROM <table> [WHERE …]` so the delete executor can
1360/// capture the pre-image before actually removing the rows. Returns
1361/// `None` when the input does not start with `DELETE`.
1362///
1363/// Case-insensitive on the keywords. Preserves everything between
1364/// the table name and the RETURNING clause, so WHERE / ORDER BY /
1365/// LIMIT survive untouched. The RETURNING tail — if present — is
1366/// truncated at the first top-level `RETURNING` token.
1367fn delete_to_select_sql(sql: &str) -> Option<String> {
1368    let trimmed = sql.trim_start();
1369    let lowered = trimmed.to_ascii_lowercase();
1370    if !lowered.starts_with("delete ") && !lowered.starts_with("delete\t") {
1371        return None;
1372    }
1373    // Find `FROM` after DELETE.
1374    let from_idx = lowered.find(" from ")?;
1375    let after_from = &trimmed[from_idx + " from ".len()..];
1376    let after_from_lc = &lowered[from_idx + " from ".len()..];
1377
1378    // Cut off the RETURNING tail (a naive search — the RETURNING
1379    // clause only appears once per statement at top level in our
1380    // grammar). Matches whitespace-bounded tokens to avoid clipping
1381    // `RETURNING` inside a string literal.
1382    let mut body = after_from.to_string();
1383    if let Some(pos) = find_top_level_keyword(after_from_lc, "returning") {
1384        body.truncate(pos);
1385    }
1386    Some(format!("SELECT * FROM {}", body.trim_end()))
1387}
1388
1389/// Find the byte offset of a whitespace-bounded keyword in a
1390/// lowercased haystack, skipping matches inside single-quoted
1391/// string literals. Naive — no escape handling — but enough for
1392/// the shapes the DML parser emits.
1393fn find_top_level_keyword(haystack: &str, needle: &str) -> Option<usize> {
1394    let bytes = haystack.as_bytes();
1395    let nlen = needle.len();
1396    let mut i = 0usize;
1397    let mut in_string = false;
1398    while i < bytes.len() {
1399        let c = bytes[i];
1400        if c == b'\'' {
1401            in_string = !in_string;
1402            i += 1;
1403            continue;
1404        }
1405        if !in_string
1406            && i + nlen <= bytes.len()
1407            && &bytes[i..i + nlen] == needle.as_bytes()
1408            && (i == 0 || bytes[i - 1].is_ascii_whitespace())
1409            && (i + nlen == bytes.len() || bytes[i + nlen].is_ascii_whitespace())
1410        {
1411            return Some(i);
1412        }
1413        i += 1;
1414    }
1415    None
1416}
1417
1418/// Build a `UnifiedResult` from the rows affected by a DML statement plus
1419/// its `RETURNING` clause. Each snapshot is a list of (column, value) pairs
1420/// for one affected row; `outputs`, when provided, supplies the engine-
1421/// assigned entity id for the same row (INSERT path). Projection honours
1422/// the RETURNING items: `*` expands to every snapshot column plus
1423/// `red_entity_id` when available.
1424fn build_returning_result(
1425    items: &[ReturningItem],
1426    snapshots: &[Vec<(String, Value)>],
1427    outputs: Option<&[crate::application::entity::CreateEntityOutput]>,
1428) -> UnifiedResult {
1429    let project_all = items.iter().any(|it| matches!(it, ReturningItem::All));
1430
1431    let mut columns: Vec<String> = if project_all {
1432        let mut cols: Vec<String> = Vec::new();
1433        if outputs.is_some() {
1434            cols.push("red_entity_id".to_string());
1435        }
1436        if let Some(first) = snapshots.first() {
1437            for (name, _) in first {
1438                cols.push(name.clone());
1439            }
1440        }
1441        cols
1442    } else {
1443        items
1444            .iter()
1445            .filter_map(|it| match it {
1446                ReturningItem::Column(c) => Some(c.clone()),
1447                ReturningItem::All => None,
1448            })
1449            .collect()
1450    };
1451    // Guarantee unique order-preserving column list.
1452    {
1453        let mut seen = std::collections::HashSet::new();
1454        columns.retain(|c| seen.insert(c.clone()));
1455    }
1456
1457    let id_key = sys_key_red_entity_id();
1458    let mut records: Vec<UnifiedRecord> = Vec::with_capacity(snapshots.len());
1459    for (idx, snap) in snapshots.iter().enumerate() {
1460        let mut values: HashMap<Arc<str>, Value> = HashMap::with_capacity(columns.len());
1461        if let Some(outs) = outputs {
1462            if let Some(out) = outs.get(idx) {
1463                values.insert(Arc::clone(&id_key), Value::Integer(out.id.raw() as i64));
1464            }
1465        }
1466        for (name, val) in snap {
1467            values.insert(Arc::from(name.as_str()), val.clone());
1468        }
1469        let mut rec = UnifiedRecord::default();
1470        // Only keep projected columns on the record.
1471        for col in &columns {
1472            if let Some(v) = values.get(col.as_str()) {
1473                rec.set_arc(Arc::from(col.as_str()), v.clone());
1474            }
1475        }
1476        records.push(rec);
1477    }
1478
1479    UnifiedResult {
1480        columns,
1481        records,
1482        stats: Default::default(),
1483        pre_serialized_json: None,
1484    }
1485}
1486
1487fn dedupe_update_columns(mut columns: Vec<String>) -> Vec<String> {
1488    if columns.is_empty() {
1489        return columns;
1490    }
1491
1492    let mut unique = Vec::with_capacity(columns.len());
1493    for column in columns.drain(..) {
1494        if !unique
1495            .iter()
1496            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1497        {
1498            unique.push(column);
1499        }
1500    }
1501    unique
1502}
1503
1504// =============================================================================
1505// Helper functions for extracting typed values from column/value pairs
1506// =============================================================================
1507
1508const SQL_TTL_METADATA_COLUMNS: [&str; 3] = ["_ttl", "_ttl_ms", "_expires_at"];
1509
1510fn resolve_sql_ttl_metadata_key(column: &str) -> Option<&'static str> {
1511    if column.eq_ignore_ascii_case("_ttl") {
1512        Some(SQL_TTL_METADATA_COLUMNS[0])
1513    } else if column.eq_ignore_ascii_case("_ttl_ms") {
1514        Some(SQL_TTL_METADATA_COLUMNS[1])
1515    } else if column.eq_ignore_ascii_case("_expires_at") {
1516        Some(SQL_TTL_METADATA_COLUMNS[2])
1517    } else {
1518        None
1519    }
1520}
1521
1522/// Canonicalize a SQL TTL metadata `(key, value)` pair so the retention
1523/// sweeper sees a single key (`_ttl_ms`) regardless of which legacy form
1524/// the operator wrote. `_ttl` is scaled from seconds to milliseconds;
1525/// `_ttl_ms` and `_expires_at` are passed through.
1526fn canonicalize_sql_ttl_metadata(
1527    key: &'static str,
1528    value: MetadataValue,
1529) -> (&'static str, MetadataValue) {
1530    if key != "_ttl" {
1531        return (key, value);
1532    }
1533    let scaled = match value {
1534        MetadataValue::Int(s) => MetadataValue::Int(s.saturating_mul(1_000)),
1535        MetadataValue::Timestamp(ms_or_s) => {
1536            // Timestamp is already chosen for very large values; treat as
1537            // already-ms to avoid silent overflow.
1538            MetadataValue::Timestamp(ms_or_s)
1539        }
1540        MetadataValue::Float(f) => MetadataValue::Float(f * 1_000.0),
1541        other => other,
1542    };
1543    ("_ttl_ms", scaled)
1544}
1545
1546/// Sentinel prefix produced by the parser for `PASSWORD('...')` and
1547/// `SECRET('...')` literals. The runtime strips this marker and
1548/// applies the actual crypto transform during INSERT execution.
1549pub(crate) const PLAINTEXT_SENTINEL: &str = "@@plain@@";
1550
1551impl RedDBRuntime {
1552    /// Strip the plaintext sentinel from a `Value::Password` or
1553    /// `Value::Secret` produced by the parser and apply the real
1554    /// crypto transform. `Password` is always hashed with argon2id.
1555    /// `Secret` is encrypted with AES-256-GCM keyed by the vault
1556    /// when `red.config.secret.auto_encrypt = true` (default).
1557    pub(crate) fn resolve_crypto_sentinel(&self, value: Value) -> RedDBResult<Value> {
1558        match value {
1559            Value::Password(marked) => {
1560                if let Some(plain) = marked.strip_prefix(PLAINTEXT_SENTINEL) {
1561                    Ok(Value::Password(crate::auth::store::hash_password(plain)))
1562                } else {
1563                    Ok(Value::Password(marked))
1564                }
1565            }
1566            Value::Secret(bytes) => {
1567                if bytes.starts_with(PLAINTEXT_SENTINEL.as_bytes()) {
1568                    if !self.secret_auto_encrypt() {
1569                        return Err(RedDBError::Query(
1570                            "SECRET() literal rejected: red.config.secret.auto_encrypt \
1571                             is false. Insert pre-encrypted bytes directly instead."
1572                                .to_string(),
1573                        ));
1574                    }
1575                    let key = self.secret_aes_key().ok_or_else(|| {
1576                        RedDBError::Query(
1577                            "SECRET() column encryption requires a bootstrapped \
1578                             vault (red.secret.aes_key is missing). Start the server \
1579                             with --vault to enable."
1580                                .to_string(),
1581                        )
1582                    })?;
1583                    let plain = &bytes[PLAINTEXT_SENTINEL.len()..];
1584                    Ok(Value::Secret(encrypt_secret_payload(&key, plain)))
1585                } else {
1586                    Ok(Value::Secret(bytes))
1587                }
1588            }
1589            other => Ok(other),
1590        }
1591    }
1592}
1593
1594/// Encode an AES-256-GCM ciphertext as `[12-byte nonce][ciphertext||tag]`.
1595/// This is the on-disk representation of `Value::Secret`.
1596fn encrypt_secret_payload(key: &[u8; 32], plaintext: &[u8]) -> Vec<u8> {
1597    let nonce_bytes = crate::auth::store::random_bytes(12);
1598    let mut nonce = [0u8; 12];
1599    nonce.copy_from_slice(&nonce_bytes[..12]);
1600    let ct = crate::crypto::aes_gcm::aes256_gcm_encrypt(key, &nonce, b"reddb.secret", plaintext);
1601    let mut out = Vec::with_capacity(12 + ct.len());
1602    out.extend_from_slice(&nonce);
1603    out.extend_from_slice(&ct);
1604    out
1605}
1606
1607/// Decode a `Value::Secret` payload back to plaintext. Returns
1608/// `None` when the payload is too short or AES-GCM authentication
1609/// fails (tampered or wrong key).
1610pub(crate) fn decrypt_secret_payload(key: &[u8; 32], payload: &[u8]) -> Option<Vec<u8>> {
1611    if payload.len() < 12 {
1612        return None;
1613    }
1614    let mut nonce = [0u8; 12];
1615    nonce.copy_from_slice(&payload[..12]);
1616    crate::crypto::aes_gcm::aes256_gcm_decrypt(key, &nonce, b"reddb.secret", &payload[12..]).ok()
1617}
1618
1619fn split_insert_metadata(
1620    runtime: &RedDBRuntime,
1621    columns: &[String],
1622    values: &[Value],
1623) -> RedDBResult<(Vec<(String, Value)>, Vec<(String, MetadataValue)>)> {
1624    let mut fields = Vec::new();
1625    let mut metadata = Vec::new();
1626
1627    for (column, value) in columns.iter().zip(values.iter()) {
1628        // Still support legacy _ttl columns for backward compat
1629        if let Some(metadata_key) = resolve_sql_ttl_metadata_key(column) {
1630            let raw_value = sql_literal_to_metadata_value(metadata_key, value)?;
1631            let (canonical_key, canonical_value) =
1632                canonicalize_sql_ttl_metadata(metadata_key, raw_value);
1633            metadata.push((canonical_key.to_string(), canonical_value));
1634            continue;
1635        }
1636        fields.push((
1637            column.clone(),
1638            runtime.resolve_crypto_sentinel(value.clone())?,
1639        ));
1640    }
1641
1642    Ok((fields, metadata))
1643}
1644
1645/// Merge structured WITH TTL, WITH EXPIRES AT, and WITH METADATA clauses into metadata entries.
1646fn merge_with_clauses(
1647    metadata: &mut Vec<(String, MetadataValue)>,
1648    ttl_ms: Option<u64>,
1649    expires_at_ms: Option<u64>,
1650    with_metadata: &[(String, Value)],
1651) {
1652    if let Some(ms) = ttl_ms {
1653        metadata.push((
1654            "_ttl_ms".to_string(),
1655            if ms <= i64::MAX as u64 {
1656                MetadataValue::Int(ms as i64)
1657            } else {
1658                MetadataValue::Timestamp(ms)
1659            },
1660        ));
1661    }
1662    if let Some(ms) = expires_at_ms {
1663        metadata.push(("_expires_at".to_string(), MetadataValue::Timestamp(ms)));
1664    }
1665    for (key, value) in with_metadata {
1666        let meta_value = match value {
1667            Value::Text(s) => MetadataValue::String(s.to_string()),
1668            Value::Integer(n) => MetadataValue::Int(*n),
1669            Value::Float(n) => MetadataValue::Float(*n),
1670            Value::Boolean(b) => MetadataValue::Bool(*b),
1671            _ => MetadataValue::String(value.to_string()),
1672        };
1673        metadata.push((key.clone(), meta_value));
1674    }
1675}
1676
1677fn apply_collection_default_ttl_metadata(
1678    runtime: &RedDBRuntime,
1679    collection: &str,
1680    metadata: &mut Vec<(String, MetadataValue)>,
1681) {
1682    if has_internal_ttl_metadata(metadata) {
1683        return;
1684    }
1685
1686    let Some(default_ttl_ms) = runtime.db().collection_default_ttl_ms(collection) else {
1687        return;
1688    };
1689
1690    metadata.push((
1691        "_ttl_ms".to_string(),
1692        if default_ttl_ms <= i64::MAX as u64 {
1693            MetadataValue::Int(default_ttl_ms as i64)
1694        } else {
1695            MetadataValue::Timestamp(default_ttl_ms)
1696        },
1697    ));
1698}
1699
1700fn ensure_non_tree_reserved_metadata_entries(
1701    metadata: &[(String, MetadataValue)],
1702) -> RedDBResult<()> {
1703    for (key, _) in metadata {
1704        ensure_non_tree_reserved_metadata_key(key)?;
1705    }
1706    Ok(())
1707}
1708
1709fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
1710    if key.starts_with(TREE_METADATA_PREFIX) {
1711        return Err(RedDBError::Query(format!(
1712            "metadata key '{}' is reserved for managed trees",
1713            key
1714        )));
1715    }
1716    Ok(())
1717}
1718
1719fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
1720    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
1721        return Err(RedDBError::Query(format!(
1722            "edge label '{}' is reserved for managed trees",
1723            TREE_CHILD_EDGE_LABEL
1724        )));
1725    }
1726    Ok(())
1727}
1728
1729fn pairwise_columns_values(pairs: &[(String, Value)]) -> (Vec<String>, Vec<Value>) {
1730    let mut columns = Vec::with_capacity(pairs.len());
1731    let mut values = Vec::with_capacity(pairs.len());
1732
1733    for (column, value) in pairs {
1734        columns.push(column.clone());
1735        values.push(value.clone());
1736    }
1737
1738    (columns, values)
1739}
1740
1741/// Find a required column value and return it as-is.
1742fn find_column_value(columns: &[String], values: &[Value], name: &str) -> RedDBResult<Value> {
1743    for (i, col) in columns.iter().enumerate() {
1744        if col.eq_ignore_ascii_case(name) {
1745            return Ok(values[i].clone());
1746        }
1747    }
1748    Err(RedDBError::Query(format!(
1749        "required column '{name}' not found in INSERT"
1750    )))
1751}
1752
1753/// Find a required column value and coerce to String.
1754fn find_column_value_string(
1755    columns: &[String],
1756    values: &[Value],
1757    name: &str,
1758) -> RedDBResult<String> {
1759    let val = find_column_value(columns, values, name)?;
1760    match val {
1761        Value::Text(s) => Ok(s.to_string()),
1762        Value::Integer(n) => Ok(n.to_string()),
1763        Value::Float(n) => Ok(n.to_string()),
1764        other => Err(RedDBError::Query(format!(
1765            "column '{name}' expected text, got {other:?}"
1766        ))),
1767    }
1768}
1769
1770fn find_column_value_f64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<f64> {
1771    let val = find_column_value(columns, values, name)?;
1772    match val {
1773        Value::Float(n) => Ok(n),
1774        Value::Integer(n) => Ok(n as f64),
1775        Value::UnsignedInteger(n) => Ok(n as f64),
1776        Value::Text(s) => s
1777            .parse::<f64>()
1778            .map_err(|_| RedDBError::Query(format!("column '{name}' expected number, got '{s}'"))),
1779        other => Err(RedDBError::Query(format!(
1780            "column '{name}' expected number, got {other:?}"
1781        ))),
1782    }
1783}
1784
1785/// Find an optional column value as String.
1786fn find_column_value_opt_string(
1787    columns: &[String],
1788    values: &[Value],
1789    name: &str,
1790) -> Option<String> {
1791    for (i, col) in columns.iter().enumerate() {
1792        if col.eq_ignore_ascii_case(name) {
1793            return match &values[i] {
1794                Value::Null => None,
1795                Value::Text(s) => Some(s.to_string()),
1796                Value::Integer(n) => Some(n.to_string()),
1797                Value::Float(n) => Some(n.to_string()),
1798                _ => None,
1799            };
1800        }
1801    }
1802    None
1803}
1804
1805/// Find a required column value and coerce to u64.
1806fn find_column_value_u64(columns: &[String], values: &[Value], name: &str) -> RedDBResult<u64> {
1807    let val = find_column_value(columns, values, name)?;
1808    match val {
1809        Value::Integer(n) => Ok(n as u64),
1810        Value::UnsignedInteger(n) => Ok(n),
1811        Value::Text(s) => s
1812            .parse::<u64>()
1813            .map_err(|_| RedDBError::Query(format!("column '{name}' expected integer, got '{s}'"))),
1814        other => Err(RedDBError::Query(format!(
1815            "column '{name}' expected integer, got {other:?}"
1816        ))),
1817    }
1818}
1819
1820/// Find an optional column value as f32.
1821fn find_column_value_f32_opt(columns: &[String], values: &[Value], name: &str) -> Option<f32> {
1822    for (i, col) in columns.iter().enumerate() {
1823        if col.eq_ignore_ascii_case(name) {
1824            return match &values[i] {
1825                Value::Float(n) => Some(*n as f32),
1826                Value::Integer(n) => Some(*n as f32),
1827                Value::Null => None,
1828                _ => None,
1829            };
1830        }
1831    }
1832    None
1833}
1834
1835/// Find a required column value and coerce to Vec<f32> (from Value::Vector).
1836fn find_column_value_vec_f32(
1837    columns: &[String],
1838    values: &[Value],
1839    name: &str,
1840) -> RedDBResult<Vec<f32>> {
1841    let val = find_column_value(columns, values, name)?;
1842    match val {
1843        Value::Vector(v) => Ok(v),
1844        Value::Json(bytes) => {
1845            // Try to parse as JSON array of numbers
1846            let s = std::str::from_utf8(&bytes).map_err(|_| {
1847                RedDBError::Query(format!("column '{name}' contains invalid UTF-8"))
1848            })?;
1849            let arr: Vec<f32> = crate::json::from_str(s).map_err(|e| {
1850                RedDBError::Query(format!("column '{name}' invalid vector JSON: {e}"))
1851            })?;
1852            Ok(arr)
1853        }
1854        other => Err(RedDBError::Query(format!(
1855            "column '{name}' expected vector, got {other:?}"
1856        ))),
1857    }
1858}
1859
1860/// Extract remaining properties (all columns not in the exclusion list).
1861fn extract_remaining_properties(
1862    columns: &[String],
1863    values: &[Value],
1864    exclude: &[&str],
1865) -> Vec<(String, Value)> {
1866    columns
1867        .iter()
1868        .zip(values.iter())
1869        .filter(|(col, _)| !exclude.iter().any(|e| col.eq_ignore_ascii_case(e)))
1870        .map(|(col, val)| (col.clone(), val.clone()))
1871        .collect()
1872}
1873
1874fn validate_timeseries_insert_columns(columns: &[String]) -> RedDBResult<()> {
1875    let mut invalid = Vec::new();
1876    for column in columns {
1877        if !is_timeseries_insert_column(column) && resolve_sql_ttl_metadata_key(column).is_none() {
1878            invalid.push(column.clone());
1879        }
1880    }
1881
1882    if invalid.is_empty() {
1883        Ok(())
1884    } else {
1885        Err(RedDBError::Query(format!(
1886            "timeseries INSERT only accepts metric, value, tags, timestamp, timestamp_ns, or time columns; got {}",
1887            invalid.join(", ")
1888        )))
1889    }
1890}
1891
1892fn is_timeseries_insert_column(column: &str) -> bool {
1893    matches!(
1894        column.to_ascii_lowercase().as_str(),
1895        "metric" | "value" | "tags" | "timestamp" | "timestamp_ns" | "time"
1896    )
1897}
1898
1899fn find_timeseries_timestamp_ns(columns: &[String], values: &[Value]) -> RedDBResult<Option<u64>> {
1900    let mut found = None;
1901
1902    for alias in ["timestamp_ns", "timestamp", "time"] {
1903        for (index, column) in columns.iter().enumerate() {
1904            if !column.eq_ignore_ascii_case(alias) {
1905                continue;
1906            }
1907
1908            if found.is_some() {
1909                return Err(RedDBError::Query(
1910                    "timeseries INSERT accepts only one timestamp column".to_string(),
1911                ));
1912            }
1913
1914            found = Some(coerce_value_to_non_negative_u64(&values[index], alias)?);
1915        }
1916    }
1917
1918    Ok(found)
1919}
1920
1921fn find_timeseries_tags(
1922    columns: &[String],
1923    values: &[Value],
1924) -> RedDBResult<std::collections::HashMap<String, String>> {
1925    for (index, column) in columns.iter().enumerate() {
1926        if column.eq_ignore_ascii_case("tags") {
1927            return parse_timeseries_tags(&values[index]);
1928        }
1929    }
1930    Ok(std::collections::HashMap::new())
1931}
1932
1933fn parse_timeseries_tags(value: &Value) -> RedDBResult<std::collections::HashMap<String, String>> {
1934    match value {
1935        Value::Null => Ok(std::collections::HashMap::new()),
1936        Value::Json(bytes) => parse_timeseries_tags_json(bytes),
1937        Value::Text(text) => parse_timeseries_tags_json(text.as_bytes()),
1938        other => Err(RedDBError::Query(format!(
1939            "timeseries tags must be a JSON object or JSON text, got {other:?}"
1940        ))),
1941    }
1942}
1943
1944fn parse_timeseries_tags_json(
1945    bytes: &[u8],
1946) -> RedDBResult<std::collections::HashMap<String, String>> {
1947    let json: crate::json::Value = crate::json::from_slice(bytes)
1948        .map_err(|err| RedDBError::Query(format!("timeseries tags must be valid JSON: {err}")))?;
1949
1950    let object = match json {
1951        crate::json::Value::Object(object) => object,
1952        other => {
1953            return Err(RedDBError::Query(format!(
1954                "timeseries tags must be a JSON object, got {other:?}"
1955            )))
1956        }
1957    };
1958
1959    let mut tags = std::collections::HashMap::with_capacity(object.len());
1960    for (key, value) in object {
1961        tags.insert(key, json_tag_value_to_string(&value));
1962    }
1963    Ok(tags)
1964}
1965
1966fn json_tag_value_to_string(value: &crate::json::Value) -> String {
1967    match value {
1968        crate::json::Value::Null => "null".to_string(),
1969        crate::json::Value::Bool(value) => value.to_string(),
1970        crate::json::Value::Number(value) => value.to_string(),
1971        crate::json::Value::String(value) => value.clone(),
1972        other => other.to_string(),
1973    }
1974}
1975
1976fn coerce_value_to_non_negative_u64(value: &Value, column: &str) -> RedDBResult<u64> {
1977    match value {
1978        Value::UnsignedInteger(value) => Ok(*value),
1979        Value::Integer(value) if *value >= 0 => Ok(*value as u64),
1980        Value::Float(value) if *value >= 0.0 => Ok(*value as u64),
1981        Value::Text(value) => value.parse::<u64>().map_err(|_| {
1982            RedDBError::Query(format!(
1983                "column '{column}' expected a non-negative integer timestamp, got '{value}'"
1984            ))
1985        }),
1986        other => Err(RedDBError::Query(format!(
1987            "column '{column}' expected a non-negative integer timestamp, got {other:?}"
1988        ))),
1989    }
1990}
1991
1992fn current_unix_ns() -> u64 {
1993    std::time::SystemTime::now()
1994        .duration_since(std::time::UNIX_EPOCH)
1995        .unwrap_or_default()
1996        .as_nanos()
1997        .min(u128::from(u64::MAX)) as u64
1998}
1999
2000fn metadata_value_to_json(value: &MetadataValue) -> crate::json::Value {
2001    use crate::json::{Map, Value as JV};
2002    match value {
2003        MetadataValue::Null => JV::Null,
2004        MetadataValue::Bool(value) => JV::Bool(*value),
2005        MetadataValue::Int(value) => JV::Number(*value as f64),
2006        MetadataValue::Float(value) => JV::Number(*value),
2007        MetadataValue::String(value) => JV::String(value.clone()),
2008        MetadataValue::Bytes(value) => JV::Array(
2009            value
2010                .iter()
2011                .map(|value| JV::Number(*value as f64))
2012                .collect(),
2013        ),
2014        MetadataValue::Timestamp(value) => JV::Number(*value as f64),
2015        MetadataValue::Array(values) => {
2016            JV::Array(values.iter().map(metadata_value_to_json).collect())
2017        }
2018        MetadataValue::Object(object) => {
2019            let entries = object
2020                .iter()
2021                .map(|(key, value)| (key.clone(), metadata_value_to_json(value)))
2022                .collect();
2023            JV::Object(entries)
2024        }
2025        MetadataValue::Geo { lat, lon } => {
2026            let mut object = Map::new();
2027            object.insert("lat".to_string(), JV::Number(*lat));
2028            object.insert("lon".to_string(), JV::Number(*lon));
2029            JV::Object(object)
2030        }
2031        MetadataValue::Reference(target) => {
2032            let mut object = Map::new();
2033            object.insert(
2034                "collection".to_string(),
2035                JV::String(target.collection().to_string()),
2036            );
2037            object.insert(
2038                "entity_id".to_string(),
2039                JV::Number(target.entity_id().raw() as f64),
2040            );
2041            JV::Object(object)
2042        }
2043        MetadataValue::References(values) => {
2044            let refs = values
2045                .iter()
2046                .map(|target| {
2047                    let mut object = Map::new();
2048                    object.insert(
2049                        "collection".to_string(),
2050                        JV::String(target.collection().to_string()),
2051                    );
2052                    object.insert(
2053                        "entity_id".to_string(),
2054                        JV::Number(target.entity_id().raw() as f64),
2055                    );
2056                    JV::Object(object)
2057                })
2058                .collect();
2059            JV::Array(refs)
2060        }
2061    }
2062}
2063
2064fn storage_value_to_metadata_value(value: &Value) -> MetadataValue {
2065    match value {
2066        Value::Null => MetadataValue::Null,
2067        Value::Boolean(value) => MetadataValue::Bool(*value),
2068        Value::Integer(value) => MetadataValue::Int(*value),
2069        Value::UnsignedInteger(value) => metadata_u64_to_value(*value),
2070        Value::Float(value) => MetadataValue::Float(*value),
2071        Value::Text(value) => MetadataValue::String(value.to_string()),
2072        Value::Blob(value) => MetadataValue::Bytes(value.clone()),
2073        Value::Timestamp(value) => {
2074            if *value >= 0 {
2075                metadata_u64_to_value(*value as u64)
2076            } else {
2077                MetadataValue::Int(*value)
2078            }
2079        }
2080        Value::TimestampMs(value) => {
2081            if *value >= 0 {
2082                metadata_u64_to_value(*value as u64)
2083            } else {
2084                MetadataValue::Int(*value)
2085            }
2086        }
2087        Value::Json(value) => MetadataValue::String(String::from_utf8_lossy(value).into_owned()),
2088        Value::Uuid(value) => MetadataValue::String(format!("{value:?}")),
2089        Value::Date(value) => MetadataValue::String(value.to_string()),
2090        Value::Time(value) => MetadataValue::String(value.to_string()),
2091        Value::Decimal(value) => MetadataValue::String(value.to_string()),
2092        Value::Ipv4(value) => MetadataValue::String(format!(
2093            "{}.{}.{}.{}",
2094            (value >> 24) & 0xFF,
2095            (value >> 16) & 0xFF,
2096            (value >> 8) & 0xFF,
2097            value & 0xFF
2098        )),
2099        Value::Port(value) => MetadataValue::Int(i64::from(*value)),
2100        Value::Latitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2101        Value::Longitude(value) => MetadataValue::Float(*value as f64 / 1_000_000.0),
2102        Value::GeoPoint(lat, lon) => MetadataValue::Geo {
2103            lat: *lat as f64 / 1_000_000.0,
2104            lon: *lon as f64 / 1_000_000.0,
2105        },
2106        Value::BigInt(value) => MetadataValue::String(value.to_string()),
2107        Value::TableRef(value) => MetadataValue::String(value.clone()),
2108        Value::PageRef(value) => MetadataValue::Int(*value as i64),
2109        Value::Password(value) => MetadataValue::String(value.clone()),
2110        Value::Array(values) => {
2111            MetadataValue::Array(values.iter().map(storage_value_to_metadata_value).collect())
2112        }
2113        _ => MetadataValue::String(value.to_string()),
2114    }
2115}
2116
2117fn sql_literal_to_metadata_value(field: &str, value: &Value) -> RedDBResult<MetadataValue> {
2118    match value {
2119        Value::Null => Ok(MetadataValue::Null),
2120        Value::Integer(value) if *value >= 0 => Ok(metadata_u64_to_value(*value as u64)),
2121        Value::Integer(_) => Err(RedDBError::Query(format!(
2122            "column '{field}' must be non-negative for TTL metadata"
2123        ))),
2124        Value::UnsignedInteger(value) => Ok(metadata_u64_to_value(*value)),
2125        Value::Float(value) if value.is_finite() => {
2126            if value.fract().abs() >= f64::EPSILON {
2127                return Err(RedDBError::Query(format!(
2128                    "column '{field}' must be an integer (TTL metadata must be an integer)"
2129                )));
2130            }
2131            if *value < 0.0 {
2132                return Err(RedDBError::Query(format!(
2133                    "column '{field}' must be non-negative for TTL metadata"
2134                )));
2135            }
2136            if *value > u64::MAX as f64 {
2137                return Err(RedDBError::Query(format!(
2138                    "column '{field}' value is too large"
2139                )));
2140            }
2141            Ok(metadata_u64_to_value(*value as u64))
2142        }
2143        Value::Float(_) => Err(RedDBError::Query(format!(
2144            "column '{field}' must be a finite number"
2145        ))),
2146        Value::Text(value) => {
2147            let value = value.trim();
2148            if let Ok(value) = value.parse::<u64>() {
2149                Ok(metadata_u64_to_value(value))
2150            } else if let Ok(value) = value.parse::<i64>() {
2151                if value < 0 {
2152                    return Err(RedDBError::Query(format!(
2153                        "column '{field}' must be non-negative for TTL metadata"
2154                    )));
2155                }
2156                Ok(metadata_u64_to_value(value as u64))
2157            } else if let Ok(value) = value.parse::<f64>() {
2158                if !value.is_finite() {
2159                    return Err(RedDBError::Query(format!(
2160                        "column '{field}' must be a finite number"
2161                    )));
2162                }
2163                if value.fract().abs() >= f64::EPSILON {
2164                    return Err(RedDBError::Query(format!(
2165                        "column '{field}' must be an integer (TTL metadata must be an integer)"
2166                    )));
2167                }
2168                if value < 0.0 {
2169                    return Err(RedDBError::Query(format!(
2170                        "column '{field}' must be non-negative for TTL metadata"
2171                    )));
2172                }
2173                if value > u64::MAX as f64 {
2174                    return Err(RedDBError::Query(format!(
2175                        "column '{field}' value is too large"
2176                    )));
2177                }
2178                Ok(metadata_u64_to_value(value as u64))
2179            } else {
2180                Err(RedDBError::Query(format!(
2181                    "column '{field}' expects a numeric value for TTL metadata"
2182                )))
2183            }
2184        }
2185        _ => Err(RedDBError::Query(format!(
2186            "column '{field}' expects a numeric value for TTL metadata"
2187        ))),
2188    }
2189}
2190
2191fn metadata_u64_to_value(value: u64) -> MetadataValue {
2192    if value <= i64::MAX as u64 {
2193        MetadataValue::Int(value as i64)
2194    } else {
2195        MetadataValue::Timestamp(value)
2196    }
2197}
2198
2199/// Phase 2 PG parity: inspect a column value and return `true` when
2200/// the dotted `tail` path is already present under it. Used by the
2201/// tenant auto-fill so rows that already carry an explicit value
2202/// (bulk import, admin insert on behalf of a tenant) are not
2203/// double-stamped with the session's current_tenant().
2204fn dotted_tail_already_set(value: &Value, tail: &str) -> bool {
2205    let json = match value {
2206        Value::Null => return false,
2207        Value::Json(bytes) | Value::Blob(bytes) => {
2208            match crate::json::from_slice::<crate::json::Value>(bytes) {
2209                Ok(v) => v,
2210                Err(_) => return false,
2211            }
2212        }
2213        Value::Text(s) => {
2214            let trimmed = s.trim_start();
2215            if !(trimmed.starts_with('{') || trimmed.starts_with('[')) {
2216                return false;
2217            }
2218            match crate::json::from_str::<crate::json::Value>(s) {
2219                Ok(v) => v,
2220                Err(_) => return false,
2221            }
2222        }
2223        _ => return false,
2224    };
2225    let mut cursor = &json;
2226    for seg in tail.split('.') {
2227        match cursor {
2228            crate::json::Value::Object(map) => match map.iter().find(|(k, _)| *k == seg) {
2229                Some((_, v)) => cursor = v,
2230                None => return false,
2231            },
2232            _ => return false,
2233        }
2234    }
2235    !matches!(cursor, crate::json::Value::Null)
2236}
2237
2238/// Phase 2 PG parity: take a column value (possibly Null / Text /
2239/// Json) and return a `Value::Json` with the dotted `tail` path set
2240/// to `tenant_id`. Preserves every pre-existing key.
2241///
2242/// Accepts:
2243/// * `Value::Null`  → fresh `{tail: tenant_id}` object
2244/// * `Value::Json(bytes)` → parse, navigate / create path, re-serialize
2245/// * `Value::text(s)` if `s` is valid JSON → same as Json
2246/// * anything else → error (user supplied a scalar where we need
2247///   a JSON container)
2248fn merge_dotted_tenant(current: Value, tail: &str, tenant_id: &str) -> RedDBResult<Value> {
2249    let mut root = match current {
2250        Value::Null => crate::json::Value::Object(Default::default()),
2251        Value::Json(bytes) | Value::Blob(bytes) => {
2252            crate::json::from_slice(&bytes).map_err(|err| {
2253                RedDBError::Query(format!(
2254                    "tenant auto-fill: root column is not valid JSON ({err})"
2255                ))
2256            })?
2257        }
2258        Value::Text(s) => {
2259            if s.trim().is_empty() {
2260                crate::json::Value::Object(Default::default())
2261            } else {
2262                crate::json::from_str::<crate::json::Value>(&s).map_err(|err| {
2263                    RedDBError::Query(format!(
2264                        "tenant auto-fill: text root is not valid JSON ({err})"
2265                    ))
2266                })?
2267            }
2268        }
2269        other => {
2270            return Err(RedDBError::Query(format!(
2271                "tenant auto-fill: root column must be JSON / NULL, got {other:?}"
2272            )));
2273        }
2274    };
2275
2276    // Navigate path segments, creating intermediate objects on demand.
2277    let segments: Vec<&str> = tail.split('.').collect();
2278    let mut cursor: &mut crate::json::Value = &mut root;
2279    for (i, seg) in segments.iter().enumerate() {
2280        let is_last = i + 1 == segments.len();
2281        let map = match cursor {
2282            crate::json::Value::Object(m) => m,
2283            _ => {
2284                return Err(RedDBError::Query(format!(
2285                    "tenant auto-fill: segment '{seg}' is not inside an object"
2286                )));
2287            }
2288        };
2289        if is_last {
2290            map.insert(
2291                seg.to_string(),
2292                crate::json::Value::String(tenant_id.to_string()),
2293            );
2294            break;
2295        }
2296        cursor = map
2297            .entry(seg.to_string())
2298            .or_insert_with(|| crate::json::Value::Object(Default::default()));
2299    }
2300
2301    let bytes = crate::json::to_vec(&root).map_err(|err| {
2302        RedDBError::Query(format!(
2303            "tenant auto-fill: failed to re-serialize JSON ({err})"
2304        ))
2305    })?;
2306    Ok(Value::Json(bytes))
2307}
2308
2309#[cfg(test)]
2310mod tests {
2311    use crate::storage::schema::Value;
2312    use crate::{RedDBOptions, RedDBRuntime};
2313
2314    #[test]
2315    fn update_where_id_in_with_hash_index_updates_expected_rows() {
2316        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2317        rt.execute_query("CREATE TABLE users (id INT, score INT)")
2318            .unwrap();
2319        for id in 0..5 {
2320            rt.execute_query(&format!("INSERT INTO users (id, score) VALUES ({id}, 0)"))
2321                .unwrap();
2322        }
2323        rt.execute_query("CREATE INDEX idx_id ON users (id) USING HASH")
2324            .unwrap();
2325
2326        let updated = rt
2327            .execute_query("UPDATE users SET score = 42 WHERE id IN (1,3,4)")
2328            .unwrap();
2329        assert_eq!(updated.affected_rows, 3);
2330
2331        let selected = rt
2332            .execute_query("SELECT id, score FROM users ORDER BY id")
2333            .unwrap();
2334        let scores: Vec<(i64, i64)> = selected
2335            .result
2336            .records
2337            .iter()
2338            .map(|record| {
2339                let id = match record.get("id").unwrap() {
2340                    Value::Integer(value) => *value,
2341                    other => panic!("expected integer id, got {other:?}"),
2342                };
2343                let score = match record.get("score").unwrap() {
2344                    Value::Integer(value) => *value,
2345                    other => panic!("expected integer score, got {other:?}"),
2346                };
2347                (id, score)
2348            })
2349            .collect();
2350        assert_eq!(scores, vec![(0, 0), (1, 42), (2, 0), (3, 42), (4, 42)]);
2351    }
2352
2353    /// Drives UPDATE through the shared `DmlTargetScan` module — the
2354    /// same code path DELETE uses (#51, #52). Exercises the indexed
2355    /// equality fast-path (WHERE id = N with a HASH index), the
2356    /// unindexed range scan (WHERE score > N), and the no-WHERE
2357    /// full-scan branch to confirm the extracted "find target rows"
2358    /// loop preserves affected-row counts and the resulting row state.
2359    #[test]
2360    fn update_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2361        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2362        rt.execute_query("CREATE TABLE items (id INT, score INT)")
2363            .unwrap();
2364        for id in 0..5 {
2365            rt.execute_query(&format!(
2366                "INSERT INTO items (id, score) VALUES ({id}, {})",
2367                id * 10
2368            ))
2369            .unwrap();
2370        }
2371        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2372            .unwrap();
2373
2374        // Indexed equality UPDATE — hits the hash fast-path inside
2375        // DmlTargetScan::find_target_ids. id=2 has score=20, drop it
2376        // below the score>25 cutoff so the next assertion stays clean.
2377        let updated_one = rt
2378            .execute_query("UPDATE items SET score = 5 WHERE id = 2")
2379            .unwrap();
2380        assert_eq!(updated_one.affected_rows, 1);
2381
2382        // Unindexed scan UPDATE — bumps everyone with score > 25,
2383        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
2384        // zoned/full-scan branch.
2385        let updated_many = rt
2386            .execute_query("UPDATE items SET score = 7 WHERE score > 25")
2387            .unwrap();
2388        assert_eq!(updated_many.affected_rows, 2);
2389
2390        let snapshot = rt
2391            .execute_query("SELECT id, score FROM items ORDER BY id")
2392            .unwrap();
2393        let pairs: Vec<(i64, i64)> = snapshot
2394            .result
2395            .records
2396            .iter()
2397            .map(|record| {
2398                let id = match record.get("id").unwrap() {
2399                    Value::Integer(value) => *value,
2400                    other => panic!("expected integer id, got {other:?}"),
2401                };
2402                let score = match record.get("score").unwrap() {
2403                    Value::Integer(value) => *value,
2404                    other => panic!("expected integer score, got {other:?}"),
2405                };
2406                (id, score)
2407            })
2408            .collect();
2409        assert_eq!(pairs, vec![(0, 0), (1, 10), (2, 5), (3, 7), (4, 7)]);
2410
2411        // Full-scan UPDATE with no WHERE rewrites every remaining row.
2412        let updated_all = rt.execute_query("UPDATE items SET score = 1").unwrap();
2413        assert_eq!(updated_all.affected_rows, 5);
2414        let after = rt
2415            .execute_query("SELECT score FROM items ORDER BY id")
2416            .unwrap();
2417        let scores: Vec<i64> = after
2418            .result
2419            .records
2420            .iter()
2421            .map(|record| match record.get("score").unwrap() {
2422                Value::Integer(value) => *value,
2423                other => panic!("expected integer score, got {other:?}"),
2424            })
2425            .collect();
2426        assert_eq!(scores, vec![1, 1, 1, 1, 1]);
2427    }
2428
2429    /// Drives DELETE through the new `DmlTargetScan` module. Exercises
2430    /// both the index fast-path (WHERE id = N with a HASH index) and
2431    /// the unindexed scan path (WHERE score > N) to confirm the
2432    /// extracted "find target rows" loop preserves the affected-row
2433    /// count and which rows survive.
2434    #[test]
2435    fn delete_routes_through_dml_target_scan_for_indexed_and_scan_paths() {
2436        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2437        rt.execute_query("CREATE TABLE items (id INT, score INT)")
2438            .unwrap();
2439        for id in 0..5 {
2440            rt.execute_query(&format!(
2441                "INSERT INTO items (id, score) VALUES ({id}, {})",
2442                id * 10
2443            ))
2444            .unwrap();
2445        }
2446        rt.execute_query("CREATE INDEX idx_items_id ON items (id) USING HASH")
2447            .unwrap();
2448
2449        // Indexed equality DELETE — hits the hash fast-path inside
2450        // DmlTargetScan::find_target_ids.
2451        let deleted_one = rt.execute_query("DELETE FROM items WHERE id = 2").unwrap();
2452        assert_eq!(deleted_one.affected_rows, 1);
2453
2454        // Unindexed scan DELETE — drops everyone with score > 25,
2455        // i.e. ids 3 and 4 (scores 30, 40). Goes through the
2456        // zoned/full-scan branch.
2457        let deleted_many = rt
2458            .execute_query("DELETE FROM items WHERE score > 25")
2459            .unwrap();
2460        assert_eq!(deleted_many.affected_rows, 2);
2461
2462        let surviving = rt
2463            .execute_query("SELECT id FROM items ORDER BY id")
2464            .unwrap();
2465        let ids: Vec<i64> = surviving
2466            .result
2467            .records
2468            .iter()
2469            .map(|record| match record.get("id").unwrap() {
2470                Value::Integer(value) => *value,
2471                other => panic!("expected integer id, got {other:?}"),
2472            })
2473            .collect();
2474        assert_eq!(ids, vec![0, 1]);
2475
2476        // Sanity: full-scan DELETE with no WHERE clears the rest.
2477        let deleted_rest = rt.execute_query("DELETE FROM items").unwrap();
2478        assert_eq!(deleted_rest.affected_rows, 2);
2479        let empty = rt.execute_query("SELECT id FROM items").unwrap();
2480        assert!(empty.result.records.is_empty());
2481    }
2482
2483    /// CollectionContract gate (#49 + #50): APPEND ONLY tables accept
2484    /// INSERT but reject UPDATE and DELETE with the documented
2485    /// operator-facing error strings. Drives all three DML verbs so
2486    /// the centralized gate is exercised end-to-end.
2487    #[test]
2488    fn collection_contract_gate_blocks_update_and_delete_on_append_only() {
2489        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2490        rt.execute_query("CREATE TABLE events (id INT, payload TEXT) APPEND ONLY")
2491            .unwrap();
2492
2493        // INSERT must succeed — APPEND ONLY exists precisely to allow
2494        // appends. The gate should be a no-op for INSERT.
2495        let inserted = rt
2496            .execute_query("INSERT INTO events (id, payload) VALUES (1, 'hello')")
2497            .unwrap();
2498        assert_eq!(inserted.affected_rows, 1);
2499
2500        // UPDATE is rejected with the gate's UPDATE-specific message.
2501        let update_err = rt
2502            .execute_query("UPDATE events SET payload = 'mut' WHERE id = 1")
2503            .unwrap_err();
2504        let msg = format!("{update_err}");
2505        assert!(
2506            msg.contains("APPEND ONLY") && msg.contains("UPDATE is rejected"),
2507            "expected UPDATE rejection message, got: {msg}"
2508        );
2509
2510        // DELETE is rejected with the gate's DELETE-specific message.
2511        let delete_err = rt
2512            .execute_query("DELETE FROM events WHERE id = 1")
2513            .unwrap_err();
2514        let msg = format!("{delete_err}");
2515        assert!(
2516            msg.contains("APPEND ONLY") && msg.contains("DELETE is rejected"),
2517            "expected DELETE rejection message, got: {msg}"
2518        );
2519
2520        // Row should still be present — neither rejected mutation
2521        // touched storage.
2522        let surviving = rt.execute_query("SELECT id FROM events").unwrap();
2523        assert_eq!(surviving.result.records.len(), 1);
2524    }
2525
2526    /// CollectionContract gate: tables without an APPEND ONLY contract
2527    /// permit INSERT, UPDATE, and DELETE — the gate's default branch
2528    /// is a true pass-through, not an accidental block.
2529    #[test]
2530    fn collection_contract_gate_allows_all_verbs_on_unrestricted_table() {
2531        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2532        rt.execute_query("CREATE TABLE notes (id INT, body TEXT)")
2533            .unwrap();
2534
2535        rt.execute_query("INSERT INTO notes (id, body) VALUES (1, 'a')")
2536            .unwrap();
2537        let updated = rt
2538            .execute_query("UPDATE notes SET body = 'b' WHERE id = 1")
2539            .unwrap();
2540        assert_eq!(updated.affected_rows, 1);
2541        let deleted = rt.execute_query("DELETE FROM notes WHERE id = 1").unwrap();
2542        assert_eq!(deleted.affected_rows, 1);
2543    }
2544
2545    #[test]
2546    fn insert_into_event_enabled_table_emits_event_to_configured_queue() {
2547        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2548        rt.execute_query(
2549            "CREATE TABLE users (id INT, email TEXT) WITH EVENTS (INSERT) TO audit_log",
2550        )
2551        .unwrap();
2552
2553        let inserted = rt
2554            .execute_query("INSERT INTO users (id, email) VALUES (7, 'a@example.com')")
2555            .unwrap();
2556        assert_eq!(inserted.affected_rows, 1);
2557
2558        let events = queue_payloads(&rt, "audit_log");
2559        assert_eq!(events.len(), 1);
2560        let event = events[0].as_object().expect("event payload object");
2561        assert!(event
2562            .get("event_id")
2563            .and_then(crate::json::Value::as_str)
2564            .is_some_and(|value| !value.is_empty()));
2565        assert_eq!(
2566            event.get("op").and_then(crate::json::Value::as_str),
2567            Some("insert")
2568        );
2569        assert_eq!(
2570            event.get("collection").and_then(crate::json::Value::as_str),
2571            Some("users")
2572        );
2573        assert_eq!(
2574            event.get("id").and_then(crate::json::Value::as_u64),
2575            Some(7)
2576        );
2577        assert!(event
2578            .get("ts")
2579            .and_then(crate::json::Value::as_u64)
2580            .is_some());
2581        assert!(event
2582            .get("lsn")
2583            .and_then(crate::json::Value::as_u64)
2584            .is_some());
2585        assert!(matches!(
2586            event.get("tenant"),
2587            Some(crate::json::Value::Null)
2588        ));
2589        assert!(matches!(
2590            event.get("before"),
2591            Some(crate::json::Value::Null)
2592        ));
2593        let after = event
2594            .get("after")
2595            .and_then(crate::json::Value::as_object)
2596            .expect("after object");
2597        assert_eq!(
2598            after.get("id").and_then(crate::json::Value::as_u64),
2599            Some(7)
2600        );
2601        assert_eq!(
2602            after.get("email").and_then(crate::json::Value::as_str),
2603            Some("a@example.com")
2604        );
2605    }
2606
2607    #[test]
2608    fn multi_row_insert_emits_one_insert_event_per_row_in_order() {
2609        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2610        rt.execute_query("CREATE TABLE users (id INT, email TEXT) WITH EVENTS")
2611            .unwrap();
2612
2613        rt.execute_query(
2614            "INSERT INTO users (id, email) VALUES (1, 'a@example.com'), (2, 'b@example.com')",
2615        )
2616        .unwrap();
2617
2618        let events = queue_payloads(&rt, "users_events");
2619        assert_eq!(events.len(), 2);
2620        let mut previous_lsn = 0;
2621        for (event, expected_id) in events.iter().zip([1_u64, 2]) {
2622            let object = event.as_object().expect("event payload object");
2623            assert_eq!(
2624                object.get("op").and_then(crate::json::Value::as_str),
2625                Some("insert")
2626            );
2627            assert_eq!(
2628                object.get("id").and_then(crate::json::Value::as_u64),
2629                Some(expected_id)
2630            );
2631            let lsn = object
2632                .get("lsn")
2633                .and_then(crate::json::Value::as_u64)
2634                .expect("event lsn");
2635            assert!(
2636                lsn > previous_lsn,
2637                "event LSNs should increase in row order"
2638            );
2639            previous_lsn = lsn;
2640            let after = object
2641                .get("after")
2642                .and_then(crate::json::Value::as_object)
2643                .expect("after object");
2644            assert_eq!(
2645                after.get("id").and_then(crate::json::Value::as_u64),
2646                Some(expected_id)
2647            );
2648        }
2649    }
2650
2651    fn queue_payloads(rt: &RedDBRuntime, queue: &str) -> Vec<crate::json::Value> {
2652        let result = rt
2653            .execute_query(&format!("QUEUE PEEK {queue} 10"))
2654            .expect("peek queue");
2655        result
2656            .result
2657            .records
2658            .iter()
2659            .map(
2660                |record| match record.get("payload").expect("payload column") {
2661                    Value::Json(bytes) => crate::json::from_slice(bytes).expect("json payload"),
2662                    other => panic!("expected JSON queue payload, got {other:?}"),
2663                },
2664            )
2665            .collect()
2666    }
2667
2668    // ── #112: auto-index user `id` on first insert ─────────────────────
2669
2670    /// First insert into a fresh collection that carries a column named
2671    /// `id` registers an implicit HASH index on `id`. Subsequent inserts
2672    /// populate it transparently, and `WHERE id = N` lookups exercise
2673    /// the hash-index fast path in `DmlTargetScan::find_target_ids`.
2674    ///
2675    /// This is the load-bearing acceptance test for #112 — without the
2676    /// hook, `find_index_for_column` returns `None` and DELETE/UPDATE
2677    /// fall through to a full segment scan (the 4× perf gap documented
2678    /// in `docs/perf/delete-sequential-2026-05-06.md`).
2679    #[test]
2680    fn auto_index_id_fires_on_first_insert() {
2681        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2682        rt.execute_query("CREATE TABLE bench_users (id INT, score INT)")
2683            .unwrap();
2684
2685        // Pre-condition: no index on `id` yet.
2686        assert!(
2687            rt.index_store_ref()
2688                .find_index_for_column("bench_users", "id")
2689                .is_none(),
2690            "freshly created collection should not have an `id` index"
2691        );
2692
2693        // Single-row INSERT — drives `MutationEngine::append_one`.
2694        rt.execute_query("INSERT INTO bench_users (id, score) VALUES (1, 10)")
2695            .unwrap();
2696
2697        // Post-condition: hash index registered on `id`.
2698        let registered = rt
2699            .index_store_ref()
2700            .find_index_for_column("bench_users", "id")
2701            .expect("auto-index hook should have registered idx_id on first insert");
2702        assert_eq!(registered.name, "idx_id");
2703        assert_eq!(registered.collection, "bench_users");
2704        assert_eq!(registered.columns, vec!["id".to_string()]);
2705        assert!(matches!(
2706            registered.method,
2707            super::super::index_store::IndexMethodKind::Hash
2708        ));
2709
2710        // Subsequent inserts populate the index; `WHERE id = N` should
2711        // resolve via the hash fast path and round-trip every row.
2712        for id in 2..=5 {
2713            rt.execute_query(&format!(
2714                "INSERT INTO bench_users (id, score) VALUES ({id}, {})",
2715                id * 10
2716            ))
2717            .unwrap();
2718        }
2719        for id in 1..=5 {
2720            let result = rt
2721                .execute_query(&format!("SELECT score FROM bench_users WHERE id = {id}"))
2722                .unwrap();
2723            assert_eq!(
2724                result.result.records.len(),
2725                1,
2726                "id={id} should match one row"
2727            );
2728        }
2729
2730        // Delete via the hash fast-path — exactly the bench scenario the
2731        // perf doc identified as the 4× regression. With the index
2732        // present, `find_target_ids` short-circuits before
2733        // `for_each_entity_zoned` runs.
2734        let deleted = rt
2735            .execute_query("DELETE FROM bench_users WHERE id = 3")
2736            .unwrap();
2737        assert_eq!(deleted.affected_rows, 1);
2738    }
2739
2740    /// Bulk INSERT (the multi-row VALUES path) drives
2741    /// `MutationEngine::append_batch`. The hook must fire there too —
2742    /// otherwise the batch entry points (gRPC binary bulk, HTTP bulk,
2743    /// wire bulk INSERT) skip auto-indexing entirely.
2744    #[test]
2745    fn auto_index_id_fires_on_first_bulk_insert() {
2746        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2747        rt.execute_query("CREATE TABLE bench_bulk (id INT, score INT)")
2748            .unwrap();
2749
2750        rt.execute_query("INSERT INTO bench_bulk (id, score) VALUES (1, 10), (2, 20), (3, 30)")
2751            .unwrap();
2752
2753        let registered = rt
2754            .index_store_ref()
2755            .find_index_for_column("bench_bulk", "id")
2756            .expect("auto-index hook should fire on first bulk insert");
2757        assert_eq!(registered.name, "idx_id");
2758
2759        // Every row populated via `index_entity_insert_batch`.
2760        for id in 1..=3 {
2761            let result = rt
2762                .execute_query(&format!("SELECT score FROM bench_bulk WHERE id = {id}"))
2763                .unwrap();
2764            assert_eq!(result.result.records.len(), 1);
2765        }
2766    }
2767
2768    /// Hook is a no-op when the row carries no `id` column. Conservative
2769    /// match (case-sensitive `id`) — `Id`, `ID`, and `red_entity_id`
2770    /// don't trigger it.
2771    #[test]
2772    fn auto_index_id_skips_when_no_id_column() {
2773        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2774        rt.execute_query("CREATE TABLE plain (uid INT, label TEXT)")
2775            .unwrap();
2776        rt.execute_query("INSERT INTO plain (uid, label) VALUES (1, 'a')")
2777            .unwrap();
2778
2779        assert!(rt
2780            .index_store_ref()
2781            .find_index_for_column("plain", "id")
2782            .is_none());
2783        assert!(rt
2784            .index_store_ref()
2785            .find_index_for_column("plain", "uid")
2786            .is_none());
2787    }
2788
2789    /// Hook only fires once per collection. If an explicit
2790    /// `CREATE INDEX ... USING BTREE` already covers `id`, the hook
2791    /// detects it via `find_index_for_column` and does NOT clobber it
2792    /// with a HASH index on the next insert.
2793    #[test]
2794    fn auto_index_id_skips_when_index_already_exists() {
2795        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2796        rt.execute_query("CREATE TABLE pre (id INT, score INT)")
2797            .unwrap();
2798        // User-declared BTREE index on `id` before any insert.
2799        rt.execute_query("CREATE INDEX user_idx ON pre (id) USING BTREE")
2800            .unwrap();
2801        rt.execute_query("INSERT INTO pre (id, score) VALUES (1, 10)")
2802            .unwrap();
2803
2804        let registered = rt
2805            .index_store_ref()
2806            .find_index_for_column("pre", "id")
2807            .expect("user index should still be there");
2808        assert_eq!(
2809            registered.name, "user_idx",
2810            "auto-index hook must not overwrite an existing index"
2811        );
2812    }
2813
2814    /// Implicit `idx_id` is reaped when the collection drops. The
2815    /// existing `execute_drop_table` walks `list_indices` and drops every
2816    /// entry — confirm the auto-created index participates.
2817    #[test]
2818    fn auto_index_id_dropped_with_collection() {
2819        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2820        rt.execute_query("CREATE TABLE ephemeral (id INT, score INT)")
2821            .unwrap();
2822        rt.execute_query("INSERT INTO ephemeral (id, score) VALUES (1, 10)")
2823            .unwrap();
2824        assert!(rt
2825            .index_store_ref()
2826            .find_index_for_column("ephemeral", "id")
2827            .is_some());
2828
2829        rt.execute_query("DROP TABLE ephemeral").unwrap();
2830
2831        assert!(
2832            rt.index_store_ref()
2833                .find_index_for_column("ephemeral", "id")
2834                .is_none(),
2835            "implicit `idx_id` must be reaped when its collection drops"
2836        );
2837    }
2838
2839    /// Opt-out via `RedDBOptions::with_auto_index_id(false)` (which
2840    /// forwards to `UnifiedStoreConfig::auto_index_id`). With the knob
2841    /// off, first insert leaves the collection without an `id` index —
2842    /// DELETE/UPDATE fall back to the scan path.
2843    #[test]
2844    fn auto_index_id_disabled_by_config() {
2845        let opts = RedDBOptions::in_memory().with_auto_index_id(false);
2846        let rt = RedDBRuntime::with_options(opts).unwrap();
2847
2848        rt.execute_query("CREATE TABLE off (id INT, score INT)")
2849            .unwrap();
2850        rt.execute_query("INSERT INTO off (id, score) VALUES (1, 10)")
2851            .unwrap();
2852
2853        assert!(
2854            rt.index_store_ref()
2855                .find_index_for_column("off", "id")
2856                .is_none(),
2857            "with auto_index_id=false, no implicit index should be created"
2858        );
2859    }
2860
2861    // ── #293: UPDATE / DELETE events ─────────────────────────────────────
2862
2863    #[test]
2864    fn update_single_row_emits_update_event() {
2865        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2866        rt.execute_query(
2867            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO audit_log",
2868        )
2869        .unwrap();
2870        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
2871            .unwrap();
2872
2873        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
2874            .unwrap();
2875
2876        let events = queue_payloads(&rt, "audit_log");
2877        assert_eq!(events.len(), 1, "expected exactly 1 update event");
2878        let event = events[0].as_object().expect("event payload object");
2879        assert_eq!(
2880            event.get("op").and_then(crate::json::Value::as_str),
2881            Some("update")
2882        );
2883        assert_eq!(
2884            event.get("collection").and_then(crate::json::Value::as_str),
2885            Some("users")
2886        );
2887        assert!(event
2888            .get("event_id")
2889            .and_then(crate::json::Value::as_str)
2890            .is_some_and(|v| !v.is_empty()));
2891        let before = event
2892            .get("before")
2893            .and_then(crate::json::Value::as_object)
2894            .expect("before must be an object");
2895        let after = event
2896            .get("after")
2897            .and_then(crate::json::Value::as_object)
2898            .expect("after must be an object");
2899        assert_eq!(
2900            before.get("name").and_then(crate::json::Value::as_str),
2901            Some("Alice"),
2902            "before.name should be the old value"
2903        );
2904        assert_eq!(
2905            after.get("name").and_then(crate::json::Value::as_str),
2906            Some("Bob"),
2907            "after.name should be the new value"
2908        );
2909    }
2910
2911    #[test]
2912    fn update_event_only_includes_changed_fields() {
2913        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2914        rt.execute_query(
2915            "CREATE TABLE users (id INT, name TEXT, email TEXT) WITH EVENTS (UPDATE) TO evts",
2916        )
2917        .unwrap();
2918        rt.execute_query("INSERT INTO users (id, name, email) VALUES (1, 'Alice', 'a@x.com')")
2919            .unwrap();
2920
2921        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1")
2922            .unwrap();
2923
2924        let events = queue_payloads(&rt, "evts");
2925        assert_eq!(events.len(), 1);
2926        let event = events[0].as_object().unwrap();
2927        let before = event
2928            .get("before")
2929            .and_then(crate::json::Value::as_object)
2930            .unwrap();
2931        let after = event
2932            .get("after")
2933            .and_then(crate::json::Value::as_object)
2934            .unwrap();
2935        // Only changed field included.
2936        assert!(
2937            before.contains_key("name"),
2938            "before must include changed field"
2939        );
2940        assert!(
2941            after.contains_key("name"),
2942            "after must include changed field"
2943        );
2944        // Unchanged fields must not appear.
2945        assert!(
2946            !before.contains_key("email"),
2947            "before must not include unchanged email"
2948        );
2949        assert!(
2950            !after.contains_key("email"),
2951            "after must not include unchanged email"
2952        );
2953    }
2954
2955    #[test]
2956    fn multi_row_update_emits_one_event_per_row() {
2957        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2958        rt.execute_query("CREATE TABLE items (id INT, status TEXT) WITH EVENTS (UPDATE) TO evts")
2959            .unwrap();
2960        rt.execute_query(
2961            "INSERT INTO items (id, status) VALUES (1, 'new'), (2, 'new'), (3, 'new')",
2962        )
2963        .unwrap();
2964
2965        rt.execute_query("UPDATE items SET status = 'done'")
2966            .unwrap();
2967
2968        let events = queue_payloads(&rt, "evts");
2969        assert_eq!(events.len(), 3, "expected one update event per row");
2970        for event in &events {
2971            let obj = event.as_object().unwrap();
2972            assert_eq!(
2973                obj.get("op").and_then(crate::json::Value::as_str),
2974                Some("update")
2975            );
2976        }
2977    }
2978
2979    #[test]
2980    fn delete_single_row_emits_delete_event() {
2981        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
2982        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (DELETE) TO del_log")
2983            .unwrap();
2984        rt.execute_query("INSERT INTO users (id, name) VALUES (42, 'Alice')")
2985            .unwrap();
2986
2987        rt.execute_query("DELETE FROM users WHERE id = 42").unwrap();
2988
2989        let events = queue_payloads(&rt, "del_log");
2990        assert_eq!(events.len(), 1);
2991        let event = events[0].as_object().expect("event payload object");
2992        assert_eq!(
2993            event.get("op").and_then(crate::json::Value::as_str),
2994            Some("delete")
2995        );
2996        assert_eq!(
2997            event.get("collection").and_then(crate::json::Value::as_str),
2998            Some("users")
2999        );
3000        assert!(event
3001            .get("event_id")
3002            .and_then(crate::json::Value::as_str)
3003            .is_some_and(|v| !v.is_empty()));
3004        let before = event
3005            .get("before")
3006            .and_then(crate::json::Value::as_object)
3007            .expect("before must be an object for delete");
3008        assert_eq!(
3009            before.get("id").and_then(crate::json::Value::as_u64),
3010            Some(42)
3011        );
3012        assert_eq!(
3013            before.get("name").and_then(crate::json::Value::as_str),
3014            Some("Alice")
3015        );
3016        assert!(matches!(event.get("after"), Some(crate::json::Value::Null)));
3017    }
3018
3019    #[test]
3020    fn multi_row_delete_emits_one_event_per_row() {
3021        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3022        rt.execute_query("CREATE TABLE items (id INT, val INT) WITH EVENTS (DELETE) TO del_log")
3023            .unwrap();
3024        rt.execute_query("INSERT INTO items (id, val) VALUES (1, 10), (2, 20), (3, 30)")
3025            .unwrap();
3026
3027        rt.execute_query("DELETE FROM items").unwrap();
3028
3029        let events = queue_payloads(&rt, "del_log");
3030        assert_eq!(events.len(), 3, "expected one delete event per deleted row");
3031        for event in &events {
3032            let obj = event.as_object().unwrap();
3033            assert_eq!(
3034                obj.get("op").and_then(crate::json::Value::as_str),
3035                Some("delete")
3036            );
3037            assert!(matches!(obj.get("after"), Some(crate::json::Value::Null)));
3038        }
3039    }
3040
3041    #[test]
3042    fn ops_filter_update_does_not_emit_on_insert_or_delete() {
3043        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3044        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS (UPDATE) TO evts")
3045            .unwrap();
3046
3047        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3048            .unwrap();
3049        rt.execute_query("DELETE FROM users WHERE id = 1").unwrap();
3050
3051        let events = queue_payloads(&rt, "evts");
3052        assert!(
3053            events.is_empty(),
3054            "UPDATE-only filter must not emit INSERT or DELETE events"
3055        );
3056    }
3057
3058    // ── SUPPRESS EVENTS ────────────────────────────────────────────────────
3059
3060    #[test]
3061    fn suppress_events_on_insert_emits_no_events() {
3062        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3063        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3064            .unwrap();
3065
3066        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3067            .unwrap();
3068
3069        let events = queue_payloads(&rt, "evts");
3070        assert!(
3071            events.is_empty(),
3072            "SUPPRESS EVENTS must prevent INSERT events"
3073        );
3074    }
3075
3076    #[test]
3077    fn suppress_events_on_update_emits_no_events() {
3078        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3079        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3080            .unwrap();
3081        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice')")
3082            .unwrap();
3083        // drain the INSERT event
3084        let _ = queue_payloads(&rt, "evts");
3085        // Force pop to drain; simpler: just check new count after UPDATE
3086        rt.execute_query("QUEUE PURGE evts").unwrap();
3087
3088        rt.execute_query("UPDATE users SET name = 'Bob' WHERE id = 1 SUPPRESS EVENTS")
3089            .unwrap();
3090
3091        let events = queue_payloads(&rt, "evts");
3092        assert!(
3093            events.is_empty(),
3094            "SUPPRESS EVENTS must prevent UPDATE events"
3095        );
3096    }
3097
3098    #[test]
3099    fn suppress_events_on_delete_emits_no_events() {
3100        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3101        rt.execute_query(
3102            "CREATE TABLE users (id INT, name TEXT) WITH EVENTS (INSERT, DELETE) TO evts",
3103        )
3104        .unwrap();
3105        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3106            .unwrap();
3107
3108        rt.execute_query("DELETE FROM users WHERE id = 1 SUPPRESS EVENTS")
3109            .unwrap();
3110
3111        let events = queue_payloads(&rt, "evts");
3112        assert!(
3113            events.is_empty(),
3114            "SUPPRESS EVENTS must prevent DELETE events"
3115        );
3116    }
3117
3118    #[test]
3119    fn normal_insert_after_suppress_still_emits() {
3120        let rt = RedDBRuntime::with_options(RedDBOptions::in_memory()).unwrap();
3121        rt.execute_query("CREATE TABLE users (id INT, name TEXT) WITH EVENTS TO evts")
3122            .unwrap();
3123
3124        rt.execute_query("INSERT INTO users (id, name) VALUES (1, 'Alice') SUPPRESS EVENTS")
3125            .unwrap();
3126        rt.execute_query("INSERT INTO users (id, name) VALUES (2, 'Bob')")
3127            .unwrap();
3128
3129        let events = queue_payloads(&rt, "evts");
3130        assert_eq!(
3131            events.len(),
3132            1,
3133            "only the non-suppressed INSERT should emit"
3134        );
3135        assert_eq!(
3136            events[0].get("id").and_then(crate::json::Value::as_u64),
3137            Some(2)
3138        );
3139    }
3140}