Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        // A collection that declares an EMBED policy (#1271/#1272) — or a
94        // VISION policy with an image-embedding output (#1275) — holds its
95        // rows' enrichment vectors in place: the CDC enrichment consumer
96        // attaches vectors into the same collection so `VECTOR SEARCH` over
97        // it surfaces the enriched rows. Permit vector writes on such a
98        // collection even when it is otherwise a strict table.
99        if requested_model == crate::catalog::CollectionModel::Vector
100            && contract
101                .ai_policy
102                .as_ref()
103                .is_some_and(|policy| policy.embed.is_some() || policy.vision.is_some())
104        {
105            return Ok(());
106        }
107        return Err(crate::RedDBError::InvalidOperation(format!(
108            "collection '{}' is declared as '{}' and does not allow '{}' writes",
109            collection,
110            collection_model_name(contract.declared_model),
111            collection_model_name(requested_model)
112        )));
113    }
114
115    let now = implicit_contract_unix_ms();
116    db.save_collection_contract(crate::physical::CollectionContract {
117        name: collection.to_string(),
118        declared_model: requested_model,
119        schema_mode: crate::catalog::SchemaMode::Dynamic,
120        origin: crate::physical::ContractOrigin::Implicit,
121        version: 1,
122        created_at_unix_ms: now,
123        updated_at_unix_ms: now,
124        default_ttl_ms: db.collection_default_ttl_ms(collection),
125        vector_dimension: None,
126        vector_metric: None,
127        context_index_fields: Vec::new(),
128        declared_columns: Vec::new(),
129        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
130            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
131        timestamps_enabled: false,
132        context_index_enabled: false,
133        metrics_raw_retention_ms: None,
134        metrics_rollup_policies: Vec::new(),
135        metrics_tenant_identity: None,
136        metrics_namespace: None,
137        // Implicit contracts are created on first write — mutability
138        // is the default until the operator runs explicit DDL.
139        append_only: false,
140        subscriptions: Vec::new(),
141        analytics_config: Vec::new(),
142        session_key: None,
143        session_gap_ms: None,
144        retention_duration_ms: None,
145        analytical_storage: None,
146
147        ai_policy: None,
148    })
149    .map(|_| ())
150    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
151}
152
153fn implicit_contract_unix_ms() -> u128 {
154    std::time::SystemTime::now()
155        .duration_since(std::time::UNIX_EPOCH)
156        .unwrap_or_default()
157        .as_millis()
158}
159
160fn collection_model_allows(
161    declared_model: crate::catalog::CollectionModel,
162    requested_model: crate::catalog::CollectionModel,
163) -> bool {
164    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
165}
166
167fn ensure_vector_dimension_contract(
168    db: &crate::storage::unified::devx::RedDB,
169    collection: &str,
170    actual_dimension: usize,
171) -> RedDBResult<()> {
172    let Some(expected_dimension) = db
173        .collection_contract(collection)
174        .and_then(|contract| contract.vector_dimension)
175    else {
176        return Ok(());
177    };
178    if expected_dimension == actual_dimension {
179        return Ok(());
180    }
181    Err(crate::RedDBError::Query(format!(
182        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
183    )))
184}
185
186fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
187    match model {
188        crate::catalog::CollectionModel::Table => "table",
189        crate::catalog::CollectionModel::Document => "document",
190        crate::catalog::CollectionModel::Graph => "graph",
191        crate::catalog::CollectionModel::Vector => "vector",
192        crate::catalog::CollectionModel::Hll => "hll",
193        crate::catalog::CollectionModel::Sketch => "sketch",
194        crate::catalog::CollectionModel::Filter => "filter",
195        crate::catalog::CollectionModel::Kv => "kv",
196        crate::catalog::CollectionModel::Config => "config",
197        crate::catalog::CollectionModel::Vault => "vault",
198        crate::catalog::CollectionModel::Mixed => "mixed",
199        crate::catalog::CollectionModel::TimeSeries => "timeseries",
200        crate::catalog::CollectionModel::Queue => "queue",
201        crate::catalog::CollectionModel::Metrics => "metrics",
202    }
203}
204
205#[derive(Clone)]
206struct UniquenessRule {
207    name: String,
208    columns: Vec<String>,
209    primary_key: bool,
210}
211
212#[derive(Copy, Clone, PartialEq, Eq)]
213enum NormalizeMode {
214    /// First write for this row. Timestamps auto-filled from now on
215    /// both `created_at` and `updated_at`; user attempts to set
216    /// either column are rejected.
217    Insert,
218    /// Update/patch path. `created_at` is preserved from the existing
219    /// row (immutable after insert); `updated_at` is bumped to now.
220    /// User attempts to set either via the patch are rejected.
221    Update,
222}
223
224mod collection_contract_enforcement {
225    use super::*;
226
227    pub(super) struct CollectionContractWriteEnforcer<'a> {
228        db: &'a crate::storage::unified::devx::RedDB,
229        collection: &'a str,
230    }
231
232    impl<'a> CollectionContractWriteEnforcer<'a> {
233        pub(super) fn new(
234            db: &'a crate::storage::unified::devx::RedDB,
235            collection: &'a str,
236        ) -> Self {
237            Self { db, collection }
238        }
239
240        pub(super) fn ensure_model(
241            &self,
242            requested_model: crate::catalog::CollectionModel,
243        ) -> RedDBResult<()> {
244            ensure_collection_model_contract(self.db, self.collection, requested_model)
245        }
246
247        pub(super) fn normalize_insert_fields(
248            &self,
249            fields: Vec<(String, Value)>,
250        ) -> RedDBResult<Vec<(String, Value)>> {
251            normalize_row_fields_for_contract_with_mode(
252                self.db,
253                self.collection,
254                fields,
255                NormalizeMode::Insert,
256            )
257        }
258
259        pub(super) fn normalize_update_fields(
260            &self,
261            fields: Vec<(String, Value)>,
262        ) -> RedDBResult<Vec<(String, Value)>> {
263            normalize_row_fields_for_contract_with_mode(
264                self.db,
265                self.collection,
266                fields,
267                NormalizeMode::Update,
268            )
269        }
270
271        pub(super) fn enforce_row_uniqueness(
272            &self,
273            fields: &[(String, Value)],
274            exclude_id: Option<crate::storage::EntityId>,
275        ) -> RedDBResult<()> {
276            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
277        }
278
279        pub(super) fn enforce_batch_uniqueness(
280            &self,
281            rows: &[Vec<(String, Value)>],
282        ) -> RedDBResult<()> {
283            enforce_row_batch_uniqueness(self.db, self.collection, rows)
284        }
285
286        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
287            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
288        }
289    }
290}
291
292use collection_contract_enforcement::CollectionContractWriteEnforcer;
293
294fn normalize_row_fields_for_contract_with_mode(
295    db: &crate::storage::unified::devx::RedDB,
296    collection: &str,
297    fields: Vec<(String, Value)>,
298    mode: NormalizeMode,
299) -> RedDBResult<Vec<(String, Value)>> {
300    let Some(contract) = db.collection_contract(collection) else {
301        return Ok(fields);
302    };
303
304    if contract.declared_model != crate::catalog::CollectionModel::Table
305        || (contract.declared_columns.is_empty()
306            && contract
307                .table_def
308                .as_ref()
309                .map(|table| table.columns.is_empty())
310                .unwrap_or(true))
311    {
312        return Ok(fields);
313    }
314
315    // Capture the pre-normalize value of created_at (if present) so
316    // Update mode can preserve it. Also capture updated_at to detect
317    // user attempts to set it via the patch payload.
318    //
319    // Heuristic for Update mode: if fields ALREADY contains a
320    // `created_at` whose value matches the row's on-disk entity, the
321    // caller is the patch pipeline carrying forward an auto-populated
322    // column — not a user mutation. Allow pass-through in that case,
323    // then restore the original value at the end.
324    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
325        fields
326            .iter()
327            .find(|(n, _)| n == "created_at")
328            .map(|(_, v)| v.clone())
329    } else {
330        None
331    };
332
333    // Reject user attempts to set runtime-managed timestamp columns.
334    // On Insert we reject any mention; on Update we only reject when
335    // the patch pipeline handed us a NEW value (not the one we
336    // auto-populated during the last insert).
337    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
338        for (name, _) in &fields {
339            if name == "created_at" || name == "updated_at" {
340                return Err(crate::RedDBError::Query(format!(
341                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
342                    collection, name
343                )));
344            }
345        }
346    }
347
348    let mut provided = std::collections::BTreeMap::new();
349    for (name, value) in &fields {
350        provided.insert(name.clone(), value.clone());
351    }
352
353    let resolved_columns = resolved_contract_columns(&contract)?;
354    let declared_names: std::collections::BTreeSet<String> = resolved_columns
355        .iter()
356        .map(|column| column.name.clone())
357        .collect();
358    let unknown_fields: Vec<String> = fields
359        .iter()
360        .filter(|(name, _)| !declared_names.contains(name))
361        .map(|(name, _)| name.clone())
362        .collect();
363    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
364        && !unknown_fields.is_empty()
365    {
366        return Err(crate::RedDBError::Query(format!(
367            "collection '{}' is strict and does not allow undeclared fields: {}",
368            collection,
369            unknown_fields.join(", ")
370        )));
371    }
372    let mut normalized = Vec::new();
373    let now_ms = current_unix_ms_u64();
374
375    for column in &resolved_columns {
376        match provided.remove(&column.name) {
377            Some(value) => {
378                // Runtime-managed columns on Update: always overwrite
379                // with the runtime's own value (preserved created_at
380                // or fresh updated_at). User mutations are silently
381                // discarded because we reject them earlier.
382                if contract.timestamps_enabled && mode == NormalizeMode::Update {
383                    match column.name.as_str() {
384                        "created_at" => {
385                            normalized.push((
386                                column.name.clone(),
387                                existing_created_at
388                                    .clone()
389                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
390                            ));
391                            continue;
392                        }
393                        "updated_at" => {
394                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395                            continue;
396                        }
397                        _ => {}
398                    }
399                }
400                normalized.push((
401                    column.name.clone(),
402                    normalize_contract_value(collection, column, value)?,
403                ));
404            }
405            None => {
406                // Runtime-managed timestamp columns: auto-fill with now
407                // when the contract opted in. Both get the same value on
408                // first insert so callers can order by either.
409                if contract.timestamps_enabled
410                    && (column.name == "created_at" || column.name == "updated_at")
411                {
412                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
413                    continue;
414                }
415                if let Some(default) = &column.default {
416                    normalized.push((
417                        column.name.clone(),
418                        coerce_contract_literal(collection, &column.name, column, default)?,
419                    ));
420                } else if column.not_null {
421                    return Err(crate::RedDBError::Query(format!(
422                        "missing required column '{}' for collection '{}'",
423                        column.name, collection
424                    )));
425                }
426            }
427        }
428    }
429
430    for (name, value) in fields {
431        if !declared_names.contains(&name) {
432            normalized.push((name, value));
433        }
434    }
435
436    Ok(normalized)
437}
438
439fn current_unix_ms_u64() -> u64 {
440    std::time::SystemTime::now()
441        .duration_since(std::time::UNIX_EPOCH)
442        .map(|d| d.as_millis() as u64)
443        .unwrap_or(0)
444}
445
446fn enforce_row_uniqueness(
447    db: &crate::storage::unified::devx::RedDB,
448    collection: &str,
449    fields: &[(String, Value)],
450    exclude_id: Option<crate::storage::EntityId>,
451) -> RedDBResult<()> {
452    let Some(contract) = db.collection_contract(collection) else {
453        return Ok(());
454    };
455    if contract.declared_model != crate::catalog::CollectionModel::Table
456        && contract.declared_model != crate::catalog::CollectionModel::Mixed
457    {
458        return Ok(());
459    }
460
461    let rules = resolved_uniqueness_rules(&contract);
462    if rules.is_empty() {
463        return Ok(());
464    }
465
466    let store = db.store();
467    let Some(manager) = store.get_collection(collection) else {
468        return Ok(());
469    };
470
471    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
472
473    for rule in &rules {
474        let mut expected_signatures = Vec::new();
475        let mut skip_rule = false;
476
477        for column in &rule.columns {
478            match input_fields.get(column) {
479                Some(Value::Null) | None if rule.primary_key => {
480                    return Err(crate::RedDBError::Query(format!(
481                        "primary key '{}' in collection '{}' requires non-null column '{}'",
482                        rule.name, collection, column
483                    )))
484                }
485                Some(Value::Null) | None => {
486                    skip_rule = true;
487                    break;
488                }
489                Some(value) => {
490                    expected_signatures.push((column.clone(), value_signature(value)));
491                }
492            }
493        }
494
495        if skip_rule {
496            continue;
497        }
498
499        for entity in manager.query_all(|_| true) {
500            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
501                continue;
502            }
503            let Some(existing_fields) = row_fields_from_entity(&entity) else {
504                continue;
505            };
506
507            let duplicate = expected_signatures.iter().all(|(column, expected)| {
508                existing_fields
509                    .get(column)
510                    .map(|value| value_signature(value) == *expected)
511                    .unwrap_or(false)
512            });
513
514            if duplicate {
515                let qualifier = if rule.primary_key {
516                    "primary key"
517                } else {
518                    "unique constraint"
519                };
520                return Err(crate::RedDBError::Query(format!(
521                    "{} '{}' violated on collection '{}' for columns [{}]",
522                    qualifier,
523                    rule.name,
524                    collection,
525                    rule.columns.join(", ")
526                )));
527            }
528        }
529    }
530
531    Ok(())
532}
533
534fn enforce_row_batch_uniqueness(
535    db: &crate::storage::unified::devx::RedDB,
536    collection: &str,
537    rows: &[Vec<(String, Value)>],
538) -> RedDBResult<()> {
539    let Some(contract) = db.collection_contract(collection) else {
540        return Ok(());
541    };
542    if contract.declared_model != crate::catalog::CollectionModel::Table
543        && contract.declared_model != crate::catalog::CollectionModel::Mixed
544    {
545        return Ok(());
546    }
547
548    let rules = resolved_uniqueness_rules(&contract);
549    if rules.is_empty() {
550        return Ok(());
551    }
552
553    for rule in &rules {
554        let mut seen = std::collections::HashMap::<String, usize>::new();
555        for (row_index, fields) in rows.iter().enumerate() {
556            let input_fields: std::collections::BTreeMap<String, Value> =
557                fields.iter().cloned().collect();
558            let mut signatures = Vec::new();
559            let mut skip_rule = false;
560
561            for column in &rule.columns {
562                match input_fields.get(column) {
563                    Some(Value::Null) | None if rule.primary_key => {
564                        return Err(crate::RedDBError::Query(format!(
565                            "primary key '{}' in collection '{}' requires non-null column '{}'",
566                            rule.name, collection, column
567                        )))
568                    }
569                    Some(Value::Null) | None => {
570                        skip_rule = true;
571                        break;
572                    }
573                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
574                }
575            }
576
577            if skip_rule {
578                continue;
579            }
580
581            let signature = signatures.join("|");
582            if let Some(previous_index) = seen.insert(signature, row_index) {
583                return Err(crate::RedDBError::Query(format!(
584                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
585                    rule.name,
586                    collection,
587                    previous_index + 1,
588                    row_index + 1
589                )));
590            }
591        }
592    }
593
594    Ok(())
595}
596
597fn row_update_requires_uniqueness_check(
598    db: &crate::storage::unified::devx::RedDB,
599    collection: &str,
600    modified_columns: &[String],
601) -> bool {
602    if modified_columns.is_empty() {
603        return false;
604    }
605
606    let Some(contract) = db.collection_contract(collection) else {
607        return false;
608    };
609    if contract.declared_model != crate::catalog::CollectionModel::Table
610        && contract.declared_model != crate::catalog::CollectionModel::Mixed
611    {
612        return false;
613    }
614
615    let rules = resolved_uniqueness_rules(&contract);
616    if rules.is_empty() {
617        return false;
618    }
619
620    rules.iter().any(|rule| {
621        rule.columns.iter().any(|column| {
622            modified_columns
623                .iter()
624                .any(|modified| modified.eq_ignore_ascii_case(column))
625        })
626    })
627}
628
629pub(crate) fn build_row_update_contract_plan(
630    db: &crate::storage::unified::devx::RedDB,
631    collection: &str,
632) -> RedDBResult<Option<RowUpdateContractPlan>> {
633    let Some(contract) = db.collection_contract(collection) else {
634        return Ok(None);
635    };
636
637    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
638        && !(contract.declared_columns.is_empty()
639            && contract
640                .table_def
641                .as_ref()
642                .map(|table| table.columns.is_empty())
643                .unwrap_or(true))
644    {
645        resolved_contract_columns(&contract)?
646            .into_iter()
647            .map(|rule| {
648                (
649                    rule.name.clone(),
650                    RowUpdateColumnRule {
651                        name: rule.name,
652                        data_type: rule.data_type,
653                        data_type_name: rule.data_type_name,
654                        not_null: rule.not_null,
655                        enum_variants: rule.enum_variants,
656                    },
657                )
658            })
659            .collect()
660    } else {
661        HashMap::new()
662    };
663
664    let unique_columns = if matches!(
665        contract.declared_model,
666        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
667    ) {
668        resolved_uniqueness_rules(&contract)
669            .into_iter()
670            .flat_map(|rule| rule.columns.into_iter())
671            .map(|column| (column, ()))
672            .collect()
673    } else {
674        HashMap::new()
675    };
676
677    Ok(Some(RowUpdateContractPlan {
678        timestamps_enabled: contract.timestamps_enabled,
679        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
680        declared_rules,
681        unique_columns,
682    }))
683}
684
685pub(crate) fn normalize_row_update_assignment_with_plan(
686    collection: &str,
687    column: &str,
688    value: Value,
689    row_contract_plan: Option<&RowUpdateContractPlan>,
690) -> RedDBResult<Value> {
691    let Some(plan) = row_contract_plan else {
692        return Ok(value);
693    };
694
695    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
696        return Err(crate::RedDBError::Query(format!(
697            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
698            collection, column
699        )));
700    }
701
702    if let Some(rule) = plan.declared_rules.get(column) {
703        let rule = ResolvedColumnRule {
704            name: rule.name.clone(),
705            data_type: rule.data_type,
706            data_type_name: rule.data_type_name.clone(),
707            not_null: rule.not_null,
708            default: None,
709            enum_variants: rule.enum_variants.clone(),
710        };
711        normalize_contract_value(collection, &rule, value)
712    } else if plan.strict_schema {
713        Err(crate::RedDBError::Query(format!(
714            "collection '{}' is strict and does not allow undeclared fields: {}",
715            collection, column
716        )))
717    } else {
718        Ok(value)
719    }
720}
721
722pub(crate) fn normalize_row_update_value_for_rule(
723    collection: &str,
724    value: Value,
725    row_rule: Option<&RowUpdateColumnRule>,
726) -> RedDBResult<Value> {
727    let Some(rule) = row_rule else {
728        return Ok(value);
729    };
730
731    let rule = ResolvedColumnRule {
732        name: rule.name.clone(),
733        data_type: rule.data_type,
734        data_type_name: rule.data_type_name.clone(),
735        not_null: rule.not_null,
736        default: None,
737        enum_variants: rule.enum_variants.clone(),
738    };
739    normalize_contract_value(collection, &rule, value)
740}
741
742fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
743    if let Some(named) = row.named.as_mut() {
744        named.insert(name.to_string(), value);
745        return;
746    }
747
748    if let Some(schema) = row.schema.as_ref() {
749        if let Some(index) = schema.iter().position(|column| column == name) {
750            if let Some(slot) = row.columns.get_mut(index) {
751                *slot = value;
752                return;
753            }
754        }
755
756        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
757        for (column, current) in schema.iter().zip(row.columns.iter()) {
758            named.insert(column.clone(), current.clone());
759        }
760        named.insert(name.to_string(), value);
761        row.named = Some(named);
762        return;
763    }
764
765    let mut named = HashMap::with_capacity(1);
766    named.insert(name.to_string(), value);
767    row.named = Some(named);
768}
769
770fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
771    if let Some(named) = row.named.as_ref() {
772        named
773            .iter()
774            .map(|(key, value)| (key.clone(), value.clone()))
775            .collect()
776    } else if let Some(schema) = row.schema.as_ref() {
777        schema
778            .iter()
779            .cloned()
780            .zip(row.columns.iter().cloned())
781            .collect()
782    } else {
783        Vec::new()
784    }
785}
786
787fn apply_row_field_assignments_raw<I>(
788    row: &mut crate::storage::unified::entity::RowData,
789    field_assignments: I,
790) where
791    I: IntoIterator<Item = (String, Value)>,
792{
793    for (column, value) in field_assignments {
794        set_row_field(row, &column, value);
795    }
796}
797
798fn apply_row_field_assignments_incremental<I>(
799    collection: &str,
800    row: &mut crate::storage::unified::entity::RowData,
801    field_assignments: I,
802    row_contract_plan: Option<&RowUpdateContractPlan>,
803) -> RedDBResult<()>
804where
805    I: IntoIterator<Item = (String, Value)>,
806{
807    for (column, value) in field_assignments {
808        let value = normalize_row_update_assignment_with_plan(
809            collection,
810            &column,
811            value,
812            row_contract_plan,
813        )?;
814
815        set_row_field(row, &column, value);
816    }
817
818    Ok(())
819}
820
821fn resolved_uniqueness_rules(
822    contract: &crate::physical::CollectionContract,
823) -> Vec<UniquenessRule> {
824    let mut rules = Vec::new();
825
826    if let Some(table_def) = &contract.table_def {
827        if !table_def.primary_key.is_empty() {
828            rules.push(UniquenessRule {
829                name: "primary_key".to_string(),
830                columns: table_def.primary_key.clone(),
831                primary_key: true,
832            });
833        }
834
835        for constraint in &table_def.constraints {
836            if matches!(
837                constraint.constraint_type,
838                crate::storage::schema::ConstraintType::PrimaryKey
839            ) && !constraint.columns.is_empty()
840            {
841                rules.push(UniquenessRule {
842                    name: constraint.name.clone(),
843                    columns: constraint.columns.clone(),
844                    primary_key: true,
845                });
846            } else if matches!(
847                constraint.constraint_type,
848                crate::storage::schema::ConstraintType::Unique
849            ) && !constraint.columns.is_empty()
850            {
851                rules.push(UniquenessRule {
852                    name: constraint.name.clone(),
853                    columns: constraint.columns.clone(),
854                    primary_key: false,
855                });
856            }
857        }
858    } else {
859        for column in &contract.declared_columns {
860            if column.primary_key {
861                rules.push(UniquenessRule {
862                    name: format!("pk_{}", column.name),
863                    columns: vec![column.name.clone()],
864                    primary_key: true,
865                });
866            } else if column.unique {
867                rules.push(UniquenessRule {
868                    name: format!("uniq_{}", column.name),
869                    columns: vec![column.name.clone()],
870                    primary_key: false,
871                });
872            }
873        }
874    }
875
876    let mut dedup = std::collections::BTreeSet::new();
877    rules
878        .into_iter()
879        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
880        .collect()
881}
882
883fn row_fields_from_entity(
884    entity: &crate::storage::UnifiedEntity,
885) -> Option<std::collections::BTreeMap<String, Value>> {
886    match &entity.data {
887        crate::storage::EntityData::Row(row) => {
888            if let Some(named) = &row.named {
889                Some(
890                    named
891                        .iter()
892                        .map(|(key, value)| (key.clone(), value.clone()))
893                        .collect(),
894                )
895            } else {
896                row.schema.as_ref().map(|schema| {
897                    schema
898                        .iter()
899                        .cloned()
900                        .zip(row.columns.iter().cloned())
901                        .collect()
902                })
903            }
904        }
905        _ => None,
906    }
907}
908
909fn value_signature(value: &Value) -> String {
910    format!("{value:?}")
911}
912
913fn normalize_contract_value(
914    collection: &str,
915    column: &ResolvedColumnRule,
916    value: Value,
917) -> RedDBResult<Value> {
918    if matches!(value, Value::Null) {
919        if column.not_null {
920            return Err(crate::RedDBError::Query(format!(
921                "column '{}' in collection '{}' cannot be null",
922                column.name, collection
923            )));
924        }
925        return Ok(Value::Null);
926    }
927
928    let target = column.data_type;
929    if value_matches_declared_type(&value, target) {
930        return Ok(value);
931    }
932
933    let Some(raw) = value_to_coercion_input(&value) else {
934        return Err(crate::RedDBError::Query(format!(
935            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
936            column.name, collection, column.data_type_name
937        )));
938    };
939
940    coerce_contract_literal(collection, &column.name, column, &raw)
941}
942
943fn coerce_contract_literal(
944    collection: &str,
945    column_name: &str,
946    column: &ResolvedColumnRule,
947    raw: &str,
948) -> RedDBResult<Value> {
949    let target = column.data_type;
950    match target {
951        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
952        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
953        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
954            crate::RedDBError::Query(format!(
955                "failed to coerce column '{}' in collection '{}' to '{}': {}",
956                column_name, collection, column.data_type_name, err
957            ))
958        }),
959        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
960            crate::RedDBError::Query(format!(
961                "failed to coerce column '{}' in collection '{}' to '{}': {}",
962                column_name, collection, column.data_type_name, err
963            ))
964        }),
965        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
966            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
967            column_name, collection, column.data_type_name
968        ))),
969        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
970            |err| {
971                crate::RedDBError::Query(format!(
972                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
973                    column_name, collection, column.data_type_name, err
974                ))
975            },
976        ),
977    }
978}
979
980struct ResolvedColumnRule {
981    name: String,
982    data_type: DataType,
983    data_type_name: String,
984    not_null: bool,
985    default: Option<String>,
986    enum_variants: Vec<String>,
987}
988
989fn resolved_contract_columns(
990    contract: &crate::physical::CollectionContract,
991) -> RedDBResult<Vec<ResolvedColumnRule>> {
992    if let Some(table_def) = &contract.table_def {
993        return Ok(table_def
994            .columns
995            .iter()
996            .map(|column| ResolvedColumnRule {
997                name: column.name.clone(),
998                data_type: column.data_type,
999                data_type_name: data_type_name(column.data_type).to_string(),
1000                not_null: !column.nullable,
1001                default: column
1002                    .default
1003                    .as_ref()
1004                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
1005                enum_variants: column.enum_variants.clone(),
1006            })
1007            .collect());
1008    }
1009
1010    contract
1011        .declared_columns
1012        .iter()
1013        .map(|column| {
1014            let data_type = column
1015                .sql_type
1016                .as_ref()
1017                .map(crate::storage::query::resolve_sql_type_name)
1018                .transpose()
1019                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1020                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1021            Ok(ResolvedColumnRule {
1022                name: column.name.clone(),
1023                data_type,
1024                data_type_name: column.data_type.clone(),
1025                not_null: column.not_null,
1026                default: column.default.clone(),
1027                enum_variants: column.enum_variants.clone(),
1028            })
1029        })
1030        .collect()
1031}
1032
1033fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1034    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1035}
1036
1037fn data_type_name(data_type: DataType) -> &'static str {
1038    match data_type {
1039        DataType::Integer => "integer",
1040        DataType::UnsignedInteger => "unsigned_integer",
1041        DataType::Float => "float",
1042        DataType::Text => "text",
1043        DataType::Blob => "blob",
1044        DataType::Boolean => "boolean",
1045        DataType::Timestamp => "timestamp",
1046        DataType::Duration => "duration",
1047        DataType::IpAddr => "ipaddr",
1048        DataType::MacAddr => "macaddr",
1049        DataType::Vector => "vector",
1050        DataType::Nullable => "nullable",
1051        DataType::Unknown => "unknown",
1052        DataType::Json => "json",
1053        DataType::Uuid => "uuid",
1054        DataType::NodeRef => "noderef",
1055        DataType::EdgeRef => "edgeref",
1056        DataType::VectorRef => "vectorref",
1057        DataType::RowRef => "rowref",
1058        DataType::Color => "color",
1059        DataType::Email => "email",
1060        DataType::Url => "url",
1061        DataType::Phone => "phone",
1062        DataType::Semver => "semver",
1063        DataType::Cidr => "cidr",
1064        DataType::Date => "date",
1065        DataType::Time => "time",
1066        DataType::Decimal => "decimal",
1067        DataType::Enum => "enum",
1068        DataType::Array => "array",
1069        DataType::TimestampMs => "timestamp_ms",
1070        DataType::Ipv4 => "ipv4",
1071        DataType::Ipv6 => "ipv6",
1072        DataType::Subnet => "subnet",
1073        DataType::Port => "port",
1074        DataType::Latitude => "latitude",
1075        DataType::Longitude => "longitude",
1076        DataType::GeoPoint => "geopoint",
1077        DataType::Country2 => "country2",
1078        DataType::Country3 => "country3",
1079        DataType::Lang2 => "lang2",
1080        DataType::Lang5 => "lang5",
1081        DataType::Currency => "currency",
1082        DataType::AssetCode => "asset_code",
1083        DataType::Money => "money",
1084        DataType::ColorAlpha => "color_alpha",
1085        DataType::BigInt => "bigint",
1086        DataType::KeyRef => "keyref",
1087        DataType::DocRef => "docref",
1088        DataType::TableRef => "tableref",
1089        DataType::PageRef => "pageref",
1090        DataType::Secret => "secret",
1091        DataType::Password => "password",
1092        DataType::TextZstd => "text",
1093        DataType::BlobZstd => "blob",
1094    }
1095}
1096
1097fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1098    matches!(
1099        (value, target),
1100        (Value::Null, _)
1101            | (Value::Integer(_), DataType::Integer)
1102            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1103            | (Value::Float(_), DataType::Float)
1104            | (Value::Text(_), DataType::Text)
1105            | (Value::Blob(_), DataType::Blob)
1106            | (Value::Boolean(_), DataType::Boolean)
1107            | (Value::Timestamp(_), DataType::Timestamp)
1108            | (Value::Duration(_), DataType::Duration)
1109            | (Value::IpAddr(_), DataType::IpAddr)
1110            | (Value::MacAddr(_), DataType::MacAddr)
1111            | (Value::Vector(_), DataType::Vector)
1112            | (Value::Json(_), DataType::Json)
1113            | (Value::Uuid(_), DataType::Uuid)
1114            | (Value::NodeRef(_), DataType::NodeRef)
1115            | (Value::EdgeRef(_), DataType::EdgeRef)
1116            | (Value::VectorRef(_, _), DataType::VectorRef)
1117            | (Value::RowRef(_, _), DataType::RowRef)
1118            | (Value::Color(_), DataType::Color)
1119            | (Value::Email(_), DataType::Email)
1120            | (Value::Url(_), DataType::Url)
1121            | (Value::Phone(_), DataType::Phone)
1122            | (Value::Semver(_), DataType::Semver)
1123            | (Value::Cidr(_, _), DataType::Cidr)
1124            | (Value::Date(_), DataType::Date)
1125            | (Value::Time(_), DataType::Time)
1126            | (Value::Decimal(_), DataType::Decimal)
1127            | (Value::EnumValue(_), DataType::Enum)
1128            | (Value::Array(_), DataType::Array)
1129            | (Value::TimestampMs(_), DataType::TimestampMs)
1130            | (Value::Ipv4(_), DataType::Ipv4)
1131            | (Value::Ipv6(_), DataType::Ipv6)
1132            | (Value::Subnet(_, _), DataType::Subnet)
1133            | (Value::Port(_), DataType::Port)
1134            | (Value::Latitude(_), DataType::Latitude)
1135            | (Value::Longitude(_), DataType::Longitude)
1136            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1137            | (Value::Country2(_), DataType::Country2)
1138            | (Value::Country3(_), DataType::Country3)
1139            | (Value::Lang2(_), DataType::Lang2)
1140            | (Value::Lang5(_), DataType::Lang5)
1141            | (Value::Currency(_), DataType::Currency)
1142            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1143            | (Value::BigInt(_), DataType::BigInt)
1144            | (Value::KeyRef(_, _), DataType::KeyRef)
1145            | (Value::DocRef(_, _), DataType::DocRef)
1146            | (Value::TableRef(_), DataType::TableRef)
1147            | (Value::PageRef(_), DataType::PageRef)
1148            | (Value::Secret(_), DataType::Secret)
1149            | (Value::Password(_), DataType::Password)
1150    )
1151}
1152
1153fn value_to_coercion_input(value: &Value) -> Option<String> {
1154    match value {
1155        Value::Null => None,
1156        Value::Integer(value) => Some(value.to_string()),
1157        Value::UnsignedInteger(value) => Some(value.to_string()),
1158        Value::Float(value) => Some(value.to_string()),
1159        Value::Text(value) => Some(value.to_string()),
1160        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1161        Value::Boolean(value) => Some(value.to_string()),
1162        Value::Timestamp(value) => Some(value.to_string()),
1163        Value::Duration(value) => Some(value.to_string()),
1164        Value::IpAddr(value) => Some(value.to_string()),
1165        Value::MacAddr(value) => Some(format!(
1166            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1167            value[0], value[1], value[2], value[3], value[4], value[5]
1168        )),
1169        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1170        Value::Email(value) => Some(value.clone()),
1171        Value::Url(value) => Some(value.clone()),
1172        Value::Phone(value) => Some(value.to_string()),
1173        Value::Semver(value) => Some(format!(
1174            "{}.{}.{}",
1175            value / 1_000_000,
1176            (value / 1_000) % 1_000,
1177            value % 1_000
1178        )),
1179        Value::Date(value) => Some(value.to_string()),
1180        Value::Time(value) => Some(value.to_string()),
1181        Value::Decimal(value) => Some(value.to_string()),
1182        Value::TimestampMs(value) => Some(value.to_string()),
1183        Value::Ipv4(value) => Some(format!(
1184            "{}.{}.{}.{}",
1185            (value >> 24) & 0xFF,
1186            (value >> 16) & 0xFF,
1187            (value >> 8) & 0xFF,
1188            value & 0xFF
1189        )),
1190        Value::Port(value) => Some(value.to_string()),
1191        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1192        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1193        Value::GeoPoint(lat, lon) => Some(format!(
1194            "{},{}",
1195            *lat as f64 / 1_000_000.0,
1196            *lon as f64 / 1_000_000.0
1197        )),
1198        Value::BigInt(value) => Some(value.to_string()),
1199        Value::TableRef(value) => Some(value.clone()),
1200        Value::PageRef(value) => Some(value.to_string()),
1201        Value::Password(value) => Some(value.clone()),
1202        _ => None,
1203    }
1204}
1205
1206fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1207    if modified_columns.is_empty() {
1208        return modified_columns;
1209    }
1210
1211    let mut unique = Vec::with_capacity(modified_columns.len());
1212    for column in modified_columns.drain(..) {
1213        if !unique
1214            .iter()
1215            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1216        {
1217            unique.push(column);
1218        }
1219    }
1220    unique
1221}
1222
1223fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1224    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1225        return Err(crate::RedDBError::Query(
1226            "array positional document patch paths are unsupported; replace the array or full document body instead"
1227                .to_string(),
1228        ));
1229    }
1230    Ok(())
1231}
1232
1233fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1234    match fields.get("body") {
1235        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1236            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1237        }),
1238        Some(_) => Err(crate::RedDBError::Query(
1239            "document body field must contain JSON".to_string(),
1240        )),
1241        None => Ok(JsonValue::Object(Default::default())),
1242    }
1243}
1244
1245fn replace_document_row_body(
1246    fields: &mut HashMap<String, Value>,
1247    body: JsonValue,
1248    modified_columns: &mut Vec<String>,
1249) -> RedDBResult<()> {
1250    modified_columns.push("body".to_string());
1251
1252    let old_keys: Vec<String> = fields
1253        .keys()
1254        .filter(|key| key.as_str() != "body")
1255        .cloned()
1256        .collect();
1257    for key in old_keys {
1258        fields.remove(&key);
1259        modified_columns.push(key);
1260    }
1261
1262    let body_bytes = json_to_vec(&body).map_err(|err| {
1263        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1264    })?;
1265    fields.insert("body".to_string(), Value::Json(body_bytes));
1266
1267    if let JsonValue::Object(map) = &body {
1268        for (key, value) in map {
1269            fields.insert(key.clone(), json_to_storage_value(value)?);
1270            modified_columns.push(key.clone());
1271        }
1272    }
1273
1274    Ok(())
1275}
1276
1277impl RedDBRuntime {
1278    pub(crate) fn apply_loaded_patch_entity_core(
1279        &self,
1280        collection: String,
1281        mut entity: crate::storage::UnifiedEntity,
1282        payload: JsonValue,
1283        operations: Vec<PatchEntityOperation>,
1284    ) -> RedDBResult<AppliedEntityMutation> {
1285        let id = entity.id;
1286        let operations = normalize_ttl_patch_operations(operations)?;
1287        // Snapshot pre-patch row fields for the secondary-index hook —
1288        // empty for non-row entities, which is the desired no-op.
1289        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1290
1291        let db = self.db();
1292        let store = db.store();
1293        let Some(manager) = store.get_collection(&collection) else {
1294            return Err(crate::RedDBError::NotFound(format!(
1295                "collection not found: {collection}"
1296            )));
1297        };
1298
1299        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1300        let mut metadata_changed = false;
1301        let mut modified_columns: Vec<String> = Vec::new();
1302        let mut context_index_dirty = false;
1303        let mut graph_node_type: Option<String> = None;
1304        let mut graph_edge_weight: Option<f32> = None;
1305
1306        let row_contract_timestamps = db
1307            .collection_contract(&collection)
1308            .map(|c| c.timestamps_enabled)
1309            .unwrap_or(false);
1310
1311        match &mut entity.data {
1312            crate::storage::EntityData::Row(row) => {
1313                let is_document_collection = db
1314                    .collection_contract(&collection)
1315                    .map(|contract| {
1316                        contract.declared_model == crate::catalog::CollectionModel::Document
1317                    })
1318                    .unwrap_or(false);
1319                let mut field_ops = Vec::new();
1320                let mut metadata_ops = Vec::new();
1321                let mut document_body_ops = Vec::new();
1322
1323                for mut op in operations {
1324                    let Some(root) = op.path.first().map(String::as_str) else {
1325                        return Err(crate::RedDBError::Query(
1326                            "patch path cannot be empty".to_string(),
1327                        ));
1328                    };
1329
1330                    match root {
1331                        "body" if is_document_collection => {
1332                            if op.path.len() < 2 {
1333                                return Err(crate::RedDBError::Query(
1334                                    "document body patch paths require a nested key; use payload.body for full replacement"
1335                                        .to_string(),
1336                                ));
1337                            }
1338                            op.path.remove(0);
1339                            reject_document_array_position_path(&op.path)?;
1340                            document_body_ops.push(op);
1341                        }
1342                        "fields" | "named" => {
1343                            if op.path.len() < 2 {
1344                                return Err(crate::RedDBError::Query(
1345                                    "patch path 'fields' requires a nested key".to_string(),
1346                                ));
1347                            }
1348                            if row_contract_timestamps {
1349                                let leaf = op.path.get(1).map(String::as_str);
1350                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1351                                    return Err(crate::RedDBError::Query(format!(
1352                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1353                                        collection,
1354                                        leaf.unwrap_or("")
1355                                    )));
1356                                }
1357                            }
1358                            op.path.remove(0);
1359                            field_ops.push(op);
1360                        }
1361                        "metadata" => {
1362                            if op.path.len() < 2 {
1363                                return Err(crate::RedDBError::Query(
1364                                    "patch path 'metadata' requires a nested key".to_string(),
1365                                ));
1366                            }
1367                            op.path.remove(0);
1368                            metadata_ops.push(op);
1369                        }
1370                        _ => {
1371                            return Err(crate::RedDBError::Query(format!(
1372                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1373                            )));
1374                        }
1375                    }
1376                }
1377
1378                if !document_body_ops.is_empty() {
1379                    context_index_dirty = true;
1380                    let named = row.named.get_or_insert_with(Default::default);
1381                    let mut body = document_body_from_named(named)?;
1382                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1383                        .map_err(crate::RedDBError::Query)?;
1384                    replace_document_row_body(named, body, &mut modified_columns)?;
1385                }
1386
1387                if is_document_collection {
1388                    if let Some(body) = payload.get("body") {
1389                        context_index_dirty = true;
1390                        let named = row.named.get_or_insert_with(Default::default);
1391                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1392                    }
1393                }
1394
1395                if !field_ops.is_empty() {
1396                    context_index_dirty = true;
1397                    for op in &field_ops {
1398                        if let Some(col) = op.path.first() {
1399                            modified_columns.push(col.clone());
1400                        }
1401                    }
1402                    let named = row.named.get_or_insert_with(Default::default);
1403                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1404                }
1405
1406                if let Some(fields) = payload
1407                    .get("fields")
1408                    .and_then(crate::json::Value::as_object)
1409                {
1410                    if row_contract_timestamps {
1411                        for key in fields.keys() {
1412                            if key == "created_at" || key == "updated_at" {
1413                                return Err(crate::RedDBError::Query(format!(
1414                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1415                                    collection, key
1416                                )));
1417                            }
1418                        }
1419                    }
1420                    context_index_dirty = true;
1421                    let named = row.named.get_or_insert_with(Default::default);
1422                    for (key, value) in fields {
1423                        modified_columns.push(key.clone());
1424                        named.insert(key.clone(), json_to_storage_value(value)?);
1425                    }
1426                }
1427
1428                if !metadata_ops.is_empty() {
1429                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1430                    let metadata = patch_metadata.get_or_insert_with(|| {
1431                        store.get_metadata(&collection, id).unwrap_or_default()
1432                    });
1433                    let mut metadata_json = metadata_to_json(metadata);
1434                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1435                        .map_err(crate::RedDBError::Query)?;
1436                    *metadata = metadata_from_json(&metadata_json)?;
1437                    metadata_changed = true;
1438                }
1439
1440                if !modified_columns.is_empty() || row_contract_timestamps {
1441                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1442                    let current_fields = if let Some(named) = row.named.take() {
1443                        named.into_iter().collect::<Vec<_>>()
1444                    } else if let Some(schema) = row.schema.as_ref() {
1445                        schema
1446                            .iter()
1447                            .cloned()
1448                            .zip(row.columns.iter().cloned())
1449                            .collect::<Vec<_>>()
1450                    } else {
1451                        Vec::new()
1452                    };
1453                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1454                    if row_contract_timestamps {
1455                        modified_columns.push("updated_at".to_string());
1456                        context_index_dirty = true;
1457                    }
1458                    if contract.requires_uniqueness_check(&modified_columns) {
1459                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1460                    }
1461                    row.named = Some(normalized_fields.into_iter().collect());
1462                }
1463            }
1464            crate::storage::EntityData::Node(node) => {
1465                let mut field_ops = Vec::new();
1466                let mut metadata_ops = Vec::new();
1467                let mut node_type_ops = Vec::new();
1468
1469                for mut op in operations {
1470                    let Some(root) = op.path.first().map(String::as_str) else {
1471                        return Err(crate::RedDBError::Query(
1472                            "patch path cannot be empty".to_string(),
1473                        ));
1474                    };
1475
1476                    match root {
1477                        "fields" | "properties" => {
1478                            if op.path.len() < 2 {
1479                                return Err(crate::RedDBError::Query(
1480                                    "patch path 'fields' requires a nested key".to_string(),
1481                                ));
1482                            }
1483                            op.path.remove(0);
1484                            field_ops.push(op);
1485                        }
1486                        "metadata" => {
1487                            if op.path.len() < 2 {
1488                                return Err(crate::RedDBError::Query(
1489                                    "patch path 'metadata' requires a nested key".to_string(),
1490                                ));
1491                            }
1492                            op.path.remove(0);
1493                            metadata_ops.push(op);
1494                        }
1495                        "node_type" => {
1496                            if op.path.len() != 1 {
1497                                return Err(crate::RedDBError::Query(
1498                                    "patch path 'node_type' does not allow nested keys".to_string(),
1499                                ));
1500                            }
1501                            op.path.clear();
1502                            node_type_ops.push(op);
1503                        }
1504                        _ => {
1505                            return Err(crate::RedDBError::Query(format!(
1506                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1507                            )));
1508                        }
1509                    }
1510                }
1511
1512                for op in node_type_ops {
1513                    context_index_dirty = true;
1514                    let value = op.value.ok_or_else(|| {
1515                        crate::RedDBError::Query("node_type operations require a value".to_string())
1516                    })?;
1517
1518                    match op.op {
1519                        PatchEntityOperationType::Unset => {
1520                            return Err(crate::RedDBError::Query(
1521                                "node_type cannot be unset through patch operations".to_string(),
1522                            ));
1523                        }
1524                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1525                            let Some(node_type) = value.as_str() else {
1526                                return Err(crate::RedDBError::Query(
1527                                    "node_type operation requires a text value".to_string(),
1528                                ));
1529                            };
1530                            graph_node_type = Some(node_type.to_string());
1531                            modified_columns.push("node_type".to_string());
1532                        }
1533                    }
1534                }
1535
1536                if !field_ops.is_empty() {
1537                    context_index_dirty = true;
1538                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1539                }
1540
1541                if let Some(fields) = payload
1542                    .get("fields")
1543                    .and_then(crate::json::Value::as_object)
1544                {
1545                    context_index_dirty = true;
1546                    for (key, value) in fields {
1547                        node.properties
1548                            .insert(key.clone(), json_to_storage_value(value)?);
1549                    }
1550                }
1551
1552                if !metadata_ops.is_empty() {
1553                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1554                    let metadata = patch_metadata.get_or_insert_with(|| {
1555                        store.get_metadata(&collection, id).unwrap_or_default()
1556                    });
1557                    let mut metadata_json = metadata_to_json(metadata);
1558                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1559                        .map_err(crate::RedDBError::Query)?;
1560                    *metadata = metadata_from_json(&metadata_json)?;
1561                    metadata_changed = true;
1562                }
1563            }
1564            crate::storage::EntityData::Edge(edge) => {
1565                let mut field_ops = Vec::new();
1566                let mut metadata_ops = Vec::new();
1567                let mut weight_ops = Vec::new();
1568
1569                for mut op in operations {
1570                    let Some(root) = op.path.first().map(String::as_str) else {
1571                        return Err(crate::RedDBError::Query(
1572                            "patch path cannot be empty".to_string(),
1573                        ));
1574                    };
1575
1576                    match root {
1577                        "fields" | "properties" => {
1578                            if op.path.len() < 2 {
1579                                return Err(crate::RedDBError::Query(
1580                                    "patch path 'fields' requires a nested key".to_string(),
1581                                ));
1582                            }
1583                            op.path.remove(0);
1584                            field_ops.push(op);
1585                        }
1586                        "weight" => {
1587                            if op.path.len() != 1 {
1588                                return Err(crate::RedDBError::Query(
1589                                    "patch path 'weight' does not allow nested keys".to_string(),
1590                                ));
1591                            }
1592                            op.path.clear();
1593                            weight_ops.push(op);
1594                        }
1595                        "metadata" => {
1596                            if op.path.len() < 2 {
1597                                return Err(crate::RedDBError::Query(
1598                                    "patch path 'metadata' requires a nested key".to_string(),
1599                                ));
1600                            }
1601                            op.path.remove(0);
1602                            metadata_ops.push(op);
1603                        }
1604                        _ => {
1605                            return Err(crate::RedDBError::Query(format!(
1606                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1607                            )));
1608                        }
1609                    }
1610                }
1611
1612                if !field_ops.is_empty() {
1613                    context_index_dirty = true;
1614                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1615                }
1616
1617                for op in weight_ops {
1618                    context_index_dirty = true;
1619                    let value = op.value.ok_or_else(|| {
1620                        crate::RedDBError::Query("weight operations require a value".to_string())
1621                    })?;
1622
1623                    match op.op {
1624                        PatchEntityOperationType::Unset => {
1625                            return Err(crate::RedDBError::Query(
1626                                "weight cannot be unset through patch operations".to_string(),
1627                            ));
1628                        }
1629                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1630                            let Some(weight) = value.as_f64() else {
1631                                return Err(crate::RedDBError::Query(
1632                                    "weight operation requires a numeric value".to_string(),
1633                                ));
1634                            };
1635                            graph_edge_weight = Some(weight as f32);
1636                            edge.weight = weight as f32;
1637                            modified_columns.push("weight".to_string());
1638                        }
1639                    }
1640                }
1641
1642                if let Some(fields) = payload
1643                    .get("fields")
1644                    .and_then(crate::json::Value::as_object)
1645                {
1646                    context_index_dirty = true;
1647                    for (key, value) in fields {
1648                        edge.properties
1649                            .insert(key.clone(), json_to_storage_value(value)?);
1650                    }
1651                }
1652
1653                if !metadata_ops.is_empty() {
1654                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1655                    let metadata = patch_metadata.get_or_insert_with(|| {
1656                        store.get_metadata(&collection, id).unwrap_or_default()
1657                    });
1658                    let mut metadata_json = metadata_to_json(metadata);
1659                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1660                        .map_err(crate::RedDBError::Query)?;
1661                    *metadata = metadata_from_json(&metadata_json)?;
1662                    metadata_changed = true;
1663                }
1664            }
1665            crate::storage::EntityData::Vector(vector) => {
1666                let mut field_ops = Vec::new();
1667                let mut metadata_ops = Vec::new();
1668
1669                for mut op in operations {
1670                    let Some(root) = op.path.first().map(String::as_str) else {
1671                        return Err(crate::RedDBError::Query(
1672                            "patch path cannot be empty".to_string(),
1673                        ));
1674                    };
1675
1676                    match root {
1677                        "fields" => {
1678                            if op.path.len() < 2 {
1679                                return Err(crate::RedDBError::Query(
1680                                    "patch path 'fields' requires a nested key".to_string(),
1681                                ));
1682                            }
1683                            op.path.remove(0);
1684                            let Some(target) = op.path.first().map(String::as_str) else {
1685                                return Err(crate::RedDBError::Query(
1686                                    "patch path requires a target under fields".to_string(),
1687                                ));
1688                            };
1689                            if !matches!(target, "dense" | "content" | "sparse") {
1690                                return Err(crate::RedDBError::Query(format!(
1691                                    "unsupported vector patch target '{target}'"
1692                                )));
1693                            }
1694                            field_ops.push(op);
1695                        }
1696                        "metadata" => {
1697                            if op.path.len() < 2 {
1698                                return Err(crate::RedDBError::Query(
1699                                    "patch path 'metadata' requires a nested key".to_string(),
1700                                ));
1701                            }
1702                            op.path.remove(0);
1703                            metadata_ops.push(op);
1704                        }
1705                        _ => {
1706                            return Err(crate::RedDBError::Query(format!(
1707                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1708                            )));
1709                        }
1710                    }
1711                }
1712
1713                if !field_ops.is_empty() {
1714                    context_index_dirty = true;
1715                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1716                }
1717
1718                if let Some(fields) = payload
1719                    .get("fields")
1720                    .and_then(crate::json::Value::as_object)
1721                {
1722                    context_index_dirty = true;
1723                    if let Some(content) =
1724                        fields.get("content").and_then(crate::json::Value::as_str)
1725                    {
1726                        vector.content = Some(content.to_string());
1727                    }
1728                    if let Some(dense) = fields.get("dense") {
1729                        vector.dense = dense
1730                            .as_array()
1731                            .ok_or_else(|| {
1732                                crate::RedDBError::Query(
1733                                    "field 'dense' must be an array".to_string(),
1734                                )
1735                            })?
1736                            .iter()
1737                            .map(|value| {
1738                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1739                                    crate::RedDBError::Query(
1740                                        "field 'dense' must contain only numbers".to_string(),
1741                                    )
1742                                })
1743                            })
1744                            .collect::<Result<Vec<_>, _>>()?;
1745                    }
1746                }
1747
1748                if !metadata_ops.is_empty() {
1749                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1750                    let metadata = patch_metadata.get_or_insert_with(|| {
1751                        store.get_metadata(&collection, id).unwrap_or_default()
1752                    });
1753                    let mut metadata_json = metadata_to_json(metadata);
1754                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1755                        .map_err(crate::RedDBError::Query)?;
1756                    *metadata = metadata_from_json(&metadata_json)?;
1757                    metadata_changed = true;
1758                }
1759            }
1760            crate::storage::EntityData::TimeSeries(_)
1761            | crate::storage::EntityData::QueueMessage(_) => {
1762                return Err(crate::RedDBError::Query(
1763                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1764                        .to_string(),
1765                ));
1766            }
1767        }
1768
1769        if let Some(node_type) = graph_node_type {
1770            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1771                node.node_type = node_type;
1772            }
1773        }
1774        if let Some(weight) = graph_edge_weight {
1775            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1776                edge.weight = (weight * 1000.0) as u32;
1777            }
1778        }
1779
1780        if let Some(metadata) = payload
1781            .get("metadata")
1782            .and_then(crate::json::Value::as_object)
1783        {
1784            let patch_metadata = patch_metadata
1785                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1786            for (key, value) in metadata {
1787                ensure_non_tree_reserved_metadata_key(key)?;
1788                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1789            }
1790            metadata_changed = true;
1791        }
1792
1793        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1794            let patch_metadata = patch_metadata
1795                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1796            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1797                patch_metadata.remove(&key);
1798            } else {
1799                patch_metadata.set(key, value);
1800            }
1801            metadata_changed = true;
1802        }
1803
1804        entity.updated_at = std::time::SystemTime::now()
1805            .duration_since(std::time::UNIX_EPOCH)
1806            .unwrap_or_default()
1807            .as_secs();
1808
1809        modified_columns = dedupe_modified_columns(modified_columns);
1810
1811        Ok(AppliedEntityMutation {
1812            id,
1813            collection,
1814            entity,
1815            metadata: patch_metadata,
1816            modified_columns,
1817            persist_metadata: metadata_changed,
1818            context_index_dirty,
1819            replaced_entity: None,
1820            replaced_entity_previous_xmax: 0,
1821            pre_mutation_fields,
1822        })
1823    }
1824
1825    pub(crate) fn apply_loaded_sql_update_row_core(
1826        &self,
1827        collection: String,
1828        mut entity: crate::storage::UnifiedEntity,
1829        static_field_assignments: &[(String, Value)],
1830        dynamic_field_assignments: Vec<(String, Value)>,
1831        static_metadata_assignments: &[(String, MetadataValue)],
1832        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1833        row_contract_plan: Option<&RowUpdateContractPlan>,
1834        row_modified_columns_template: &[String],
1835        row_touches_unique_columns: bool,
1836    ) -> RedDBResult<AppliedEntityMutation> {
1837        let id = entity.id;
1838        let previous_xmax = entity.xmax;
1839        let db = self.db();
1840        let store = db.store();
1841        let Some(_) = store.get_collection(&collection) else {
1842            return Err(crate::RedDBError::NotFound(format!(
1843                "collection not found: {collection}"
1844            )));
1845        };
1846
1847        let versioned_update_xid = match self.current_xid() {
1848            Some(xid) => Some(xid),
1849            None => {
1850                let snapshot_manager = self.snapshot_manager();
1851                let xid = snapshot_manager.begin();
1852                snapshot_manager.commit(xid);
1853                Some(xid)
1854            }
1855        };
1856        let mut replaced_entity = versioned_update_xid.map(|xid| {
1857            let mut old = entity.clone();
1858            old.set_xmax(xid);
1859            old
1860        });
1861
1862        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1863        let row_contract_timestamps = row_contract_plan
1864            .map(|plan| plan.timestamps_enabled)
1865            .unwrap_or(false);
1866        let mut metadata_changed = false;
1867        let mut modified_columns = row_modified_columns_template.to_vec();
1868        let mut context_index_dirty = !modified_columns.is_empty();
1869
1870        // Snapshot OLD field values BEFORE applying the assignments —
1871        // the secondary-index maintenance hook needs both before/after to
1872        // delete-then-insert under changed indexed columns.
1873        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1874
1875        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1876            return Err(crate::RedDBError::Query(
1877                "SQL row update fast path requires a row entity".to_string(),
1878            ));
1879        };
1880
1881        let _ = row_contract_plan;
1882        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1883        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1884
1885        for (key, value) in static_metadata_assignments
1886            .iter()
1887            .cloned()
1888            .chain(dynamic_metadata_assignments)
1889        {
1890            ensure_non_tree_reserved_metadata_key(&key)?;
1891            patch_metadata
1892                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1893                .set(key, value);
1894            metadata_changed = true;
1895        }
1896
1897        if !modified_columns.is_empty() || row_contract_timestamps {
1898            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1899            if row_contract_timestamps {
1900                context_index_dirty = true;
1901                set_row_field(
1902                    row,
1903                    "updated_at",
1904                    Value::UnsignedInteger(current_unix_ms_u64()),
1905                );
1906                modified_columns.push("updated_at".to_string());
1907            }
1908            if row_touches_unique_columns {
1909                let current_fields = collect_row_fields(row);
1910                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1911            }
1912        }
1913
1914        entity.updated_at = std::time::SystemTime::now()
1915            .duration_since(std::time::UNIX_EPOCH)
1916            .unwrap_or_default()
1917            .as_secs();
1918
1919        if let Some(xid) = versioned_update_xid {
1920            let logical_id = entity.logical_id();
1921            entity.id = store.next_entity_id();
1922            entity.set_logical_id(logical_id);
1923            entity.set_xmin(xid);
1924            entity.set_xmax(0);
1925            if let Some(old) = replaced_entity.as_mut() {
1926                old.set_xmax(xid);
1927            }
1928        }
1929
1930        modified_columns = dedupe_modified_columns(modified_columns);
1931
1932        Ok(AppliedEntityMutation {
1933            id: entity.id,
1934            collection,
1935            entity,
1936            metadata: patch_metadata,
1937            modified_columns,
1938            persist_metadata: metadata_changed,
1939            context_index_dirty,
1940            replaced_entity,
1941            replaced_entity_previous_xmax: previous_xmax,
1942            pre_mutation_fields,
1943        })
1944    }
1945
1946    pub(crate) fn persist_applied_entity_mutations(
1947        &self,
1948        applied: &[AppliedEntityMutation],
1949    ) -> RedDBResult<()> {
1950        if applied.is_empty() {
1951            return Ok(());
1952        }
1953
1954        let store = self.db().store();
1955        let collection = &applied[0].collection;
1956        let Some(manager) = store.get_collection(collection) else {
1957            return Err(crate::RedDBError::NotFound(format!(
1958                "collection not found: {collection}"
1959            )));
1960        };
1961
1962        let mut ordinary = Vec::with_capacity(applied.len());
1963        for item in applied {
1964            if let Some(old_version) = item.replaced_entity.as_ref() {
1965                store
1966                    .install_versioned_table_row_update(
1967                        collection,
1968                        old_version.clone(),
1969                        item.entity.clone(),
1970                        if item.persist_metadata {
1971                            item.metadata.as_ref()
1972                        } else {
1973                            None
1974                        },
1975                    )
1976                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1977                if self.current_xid().is_some() {
1978                    self.record_pending_versioned_update(
1979                        crate::runtime::impl_core::current_connection_id(),
1980                        collection,
1981                        old_version.id,
1982                        item.entity.id,
1983                        old_version.xmax,
1984                        item.replaced_entity_previous_xmax,
1985                    );
1986                }
1987            } else {
1988                ordinary.push(item);
1989            }
1990        }
1991        if ordinary.is_empty() {
1992            return Ok(());
1993        }
1994
1995        manager
1996            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1997                (
1998                    &item.entity,
1999                    item.modified_columns.as_slice(),
2000                    if item.persist_metadata {
2001                        item.metadata.as_ref()
2002                    } else {
2003                        None
2004                    },
2005                )
2006            }))
2007            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
2008
2009        // PG-HOT-like fast path: segment in-place is done; when no
2010        // mutation touches a secondary-indexed column AND no metadata
2011        // payload needs to be folded into a B-tree record, skip the
2012        // in-line B-tree upsert. The WAL still records the update
2013        // (durability preserved; recovery replay rebuilds the B-tree),
2014        // and `manager.get()` prefers the live segment over the
2015        // B-tree for reads — so the short-circuit is invisible to
2016        // callers. See `persist_entities_to_pager_wal_only`.
2017        let indexed_cols = self
2018            .index_store_ref()
2019            .indexed_columns_set(collection.as_str());
2020        let all_hot = !indexed_cols.is_empty()
2021            && ordinary.iter().all(|item| {
2022                !item.persist_metadata
2023                    && !item
2024                        .modified_columns
2025                        .iter()
2026                        .any(|c| indexed_cols.contains(c))
2027            })
2028            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2029
2030        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2031        // inner loop hands us `applied` which already owns the post-image
2032        // entity; all we need for the persist path is a read borrow.
2033        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2034            ordinary.iter().map(|item| &item.entity).collect();
2035        let persist_fn = if all_hot {
2036            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2037        } else {
2038            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2039        };
2040        persist_fn(store.as_ref(), collection, &entity_refs)
2041            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2042    }
2043
2044    pub(crate) fn flush_applied_entity_mutation(
2045        &self,
2046        applied: &AppliedEntityMutation,
2047    ) -> RedDBResult<()> {
2048        let store = self.db().store();
2049        if applied.context_index_dirty {
2050            store
2051                .context_index()
2052                .index_entity(&applied.collection, &applied.entity);
2053        }
2054        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2055        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2056        // didn't carry recoverable column names) — there's nothing to
2057        // delete-then-insert in that case.
2058        //
2059        // Also build the CDC damage vector here so downstream consumers
2060        // see which columns changed without re-diffing.
2061        let mut changed_columns: Option<Vec<String>> = None;
2062        if !applied.pre_mutation_fields.is_empty() {
2063            let post = entity_row_fields_snapshot(&applied.entity);
2064            if !post.is_empty() {
2065                let damage = crate::application::entity::row_damage_vector(
2066                    &applied.pre_mutation_fields,
2067                    &post,
2068                );
2069                if !damage.is_empty() {
2070                    changed_columns = Some(
2071                        damage
2072                            .touched_columns()
2073                            .into_iter()
2074                            .map(str::to_string)
2075                            .collect(),
2076                    );
2077                }
2078
2079                // HOT-like fast path (P3.T2/T3): when no modified
2080                // column is covered by a secondary index, skip the
2081                // `index_entity_update` call entirely. The function
2082                // would short-circuit internally, but the call still
2083                // reads the registry lock + walks the damage vector
2084                // — avoiding it saves a few microseconds per UPDATE.
2085                // Page-local replace + t_ctid chain support (true
2086                // HOT) lives in a follow-up storage spec.
2087                let indexed_cols: std::collections::HashSet<String> = self
2088                    .index_store_ref()
2089                    .list_indices(applied.collection.as_str())
2090                    .into_iter()
2091                    .filter_map(|idx| idx.columns.first().cloned())
2092                    .collect();
2093                let modified_cols: std::collections::HashSet<String> = damage
2094                    .touched_columns()
2095                    .into_iter()
2096                    .map(str::to_string)
2097                    .collect();
2098                if let Some(old_version) = applied.replaced_entity.as_ref() {
2099                    let old_index_fields: Vec<(String, Value)> = applied
2100                        .pre_mutation_fields
2101                        .iter()
2102                        .filter(|(col, _)| indexed_cols.contains(col))
2103                        .cloned()
2104                        .collect();
2105                    let new_index_fields: Vec<(String, Value)> = post
2106                        .iter()
2107                        .filter(|(col, _)| indexed_cols.contains(col))
2108                        .cloned()
2109                        .collect();
2110                    if !old_index_fields.is_empty() {
2111                        self.index_store_ref()
2112                            .index_entity_delete(
2113                                &applied.collection,
2114                                old_version.id,
2115                                &old_index_fields,
2116                            )
2117                            .map_err(crate::RedDBError::Internal)?;
2118                    }
2119                    if !new_index_fields.is_empty() {
2120                        self.index_store_ref()
2121                            .index_entity_insert(
2122                                &applied.collection,
2123                                applied.entity.id,
2124                                &new_index_fields,
2125                            )
2126                            .map_err(crate::RedDBError::Internal)?;
2127                    }
2128                } else {
2129                    let decision = crate::storage::engine::hot_update::decide(
2130                        &crate::storage::engine::hot_update::HotUpdateInputs {
2131                            collection: applied.collection.as_str(),
2132                            indexed_columns: &indexed_cols,
2133                            modified_columns: &modified_cols,
2134                            // The storage layer currently handles fit via
2135                            // the segment abstraction; we bypass the
2136                            // page-size check here.
2137                            new_tuple_size: 0,
2138                            page_free_space: usize::MAX,
2139                        },
2140                    );
2141                    if !decision.can_hot {
2142                        self.index_store_ref()
2143                            .index_entity_update(
2144                                &applied.collection,
2145                                applied.id,
2146                                &applied.pre_mutation_fields,
2147                                &post,
2148                            )
2149                            .map_err(crate::RedDBError::Internal)?;
2150                    } else {
2151                        // F-04: `applied.collection` is tenant-supplied;
2152                        // strip CR/LF/control bytes via the LogField
2153                        // escaper (ADR 0010).
2154                        tracing::debug!(
2155                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2156                            "hot_update fast-path: skipped index_entity_update"
2157                        );
2158                    }
2159                }
2160            }
2161        }
2162        self.cdc_emit_prebuilt_with_columns(
2163            crate::replication::cdc::ChangeOperation::Update,
2164            &applied.collection,
2165            &applied.entity,
2166            "entity",
2167            applied.metadata.as_ref(),
2168            true,
2169            changed_columns,
2170        );
2171        Ok(())
2172    }
2173
2174    pub(crate) fn apply_loaded_patch_entity(
2175        &self,
2176        collection: String,
2177        entity: crate::storage::UnifiedEntity,
2178        payload: JsonValue,
2179        operations: Vec<PatchEntityOperation>,
2180    ) -> RedDBResult<CreateEntityOutput> {
2181        let applied =
2182            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2183        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2184        self.flush_applied_entity_mutation(&applied)?;
2185        Ok(CreateEntityOutput {
2186            id: applied.id,
2187            entity: Some(applied.entity),
2188        })
2189    }
2190}
2191
2192fn ensure_non_tree_reserved_metadata_patch_paths(
2193    operations: &[PatchEntityOperation],
2194) -> RedDBResult<()> {
2195    for operation in operations {
2196        let Some(key) = operation.path.first().map(String::as_str) else {
2197            continue;
2198        };
2199        ensure_non_tree_reserved_metadata_key(key)?;
2200    }
2201    Ok(())
2202}
2203
2204fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2205    if key.starts_with(TREE_METADATA_PREFIX) {
2206        return Err(crate::RedDBError::Query(format!(
2207            "metadata key '{}' is reserved for managed trees",
2208            key
2209        )));
2210    }
2211    Ok(())
2212}
2213
2214fn ensure_non_tree_reserved_metadata_entries(
2215    metadata: &[(String, MetadataValue)],
2216) -> RedDBResult<()> {
2217    for (key, _) in metadata {
2218        ensure_non_tree_reserved_metadata_key(key)?;
2219    }
2220    Ok(())
2221}
2222
2223fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2224    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2225        return Err(crate::RedDBError::Query(format!(
2226            "edge label '{}' is reserved for managed trees",
2227            TREE_CHILD_EDGE_LABEL
2228        )));
2229    }
2230    Ok(())
2231}
2232
2233impl RedDBRuntime {
2234    pub(crate) fn create_node_unchecked(
2235        &self,
2236        input: CreateNodeInput,
2237    ) -> RedDBResult<CreateEntityOutput> {
2238        let db = self.db();
2239        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2240        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2241        let mut metadata = input.metadata;
2242        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2243        let mut builder = db.node(&input.collection, &input.label);
2244
2245        if let Some(node_type) = input.node_type {
2246            builder = builder.node_type(node_type);
2247        }
2248
2249        for (key, value) in input.properties {
2250            builder = builder.property(key, value);
2251        }
2252
2253        for (key, value) in metadata {
2254            builder = builder.metadata(key, value);
2255        }
2256
2257        for embedding in input.embeddings {
2258            if let Some(model) = embedding.model {
2259                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2260            } else {
2261                builder = builder.embedding(embedding.name, embedding.vector);
2262            }
2263        }
2264
2265        for link in input.table_links {
2266            builder = builder.link_to_table(link.key, link.table);
2267        }
2268
2269        for link in input.node_links {
2270            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2271        }
2272
2273        let id = builder.save()?;
2274        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2275        // don't see this node until the transaction commits.
2276        self.stamp_xmin_if_in_txn(&input.collection, id);
2277        refresh_context_index(&db, &input.collection, id)?;
2278        self.cdc_emit(
2279            crate::replication::cdc::ChangeOperation::Insert,
2280            &input.collection,
2281            id.raw(),
2282            "graph_node",
2283        );
2284        Ok(CreateEntityOutput {
2285            id,
2286            entity: db.store().get(&input.collection, id),
2287        })
2288    }
2289
2290    pub(crate) fn create_edge_unchecked(
2291        &self,
2292        input: CreateEdgeInput,
2293    ) -> RedDBResult<CreateEntityOutput> {
2294        let db = self.db();
2295        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2296        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2297        let mut metadata = input.metadata;
2298        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2299        let mut builder = db
2300            .edge(&input.collection, &input.label)
2301            .from(input.from)
2302            .to(input.to);
2303
2304        if let Some(weight) = input.weight {
2305            builder = builder.weight(weight);
2306        }
2307
2308        for (key, value) in input.properties {
2309            builder = builder.property(key, value);
2310        }
2311
2312        for (key, value) in metadata {
2313            builder = builder.metadata(key, value);
2314        }
2315
2316        let id = builder.save()?;
2317        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2318        // sessions don't follow it until COMMIT.
2319        self.stamp_xmin_if_in_txn(&input.collection, id);
2320        refresh_context_index(&db, &input.collection, id)?;
2321        self.cdc_emit(
2322            crate::replication::cdc::ChangeOperation::Insert,
2323            &input.collection,
2324            id.raw(),
2325            "graph_edge",
2326        );
2327        Ok(CreateEntityOutput {
2328            id,
2329            entity: db.store().get(&input.collection, id),
2330        })
2331    }
2332}
2333
2334fn create_rows_batch_prevalidated_columnar_with_outputs(
2335    runtime: &RedDBRuntime,
2336    collection: String,
2337    column_names: std::sync::Arc<Vec<String>>,
2338    rows: Vec<Vec<crate::storage::schema::Value>>,
2339) -> RedDBResult<Vec<CreateEntityOutput>> {
2340    use crate::storage::{
2341        unified::{EntityData, EntityKind, RowData},
2342        EntityId, UnifiedEntity,
2343    };
2344    use std::sync::Arc;
2345
2346    if rows.is_empty() {
2347        return Ok(Vec::new());
2348    }
2349    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2350    runtime.check_batch_size(rows.len())?;
2351    runtime.check_db_size()?;
2352
2353    let db = runtime.db();
2354    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2355    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2356
2357    let store = db.store();
2358    let table_arc: Arc<str> = Arc::from(collection.as_str());
2359
2360    let indexed_cols = runtime
2361        .index_store_ref()
2362        .indexed_columns_set(collection.as_str());
2363    let has_secondary_indexes = !indexed_cols.is_empty();
2364    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2365        if has_secondary_indexes {
2366            Vec::with_capacity(rows.len())
2367        } else {
2368            Vec::new()
2369        };
2370
2371    let entities: Vec<UnifiedEntity> = rows
2372        .into_iter()
2373        .map(|values| {
2374            if has_secondary_indexes {
2375                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2376                    Vec::with_capacity(indexed_cols.len());
2377                for (name, val) in column_names.iter().zip(values.iter()) {
2378                    if indexed_cols.contains(name) {
2379                        snap.push((name.clone(), val.clone()));
2380                    }
2381                }
2382                field_snapshots.push(snap);
2383            }
2384            let mut row = RowData::new(values);
2385            row.schema = Some(Arc::clone(&column_names));
2386            UnifiedEntity::new(
2387                EntityId::new(0),
2388                EntityKind::TableRow {
2389                    table: Arc::clone(&table_arc),
2390                    row_id: 0,
2391                },
2392                EntityData::Row(row),
2393            )
2394        })
2395        .collect();
2396
2397    let ids = store
2398        .bulk_insert(&collection, entities)
2399        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2400
2401    if has_secondary_indexes {
2402        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2403            .iter()
2404            .zip(field_snapshots)
2405            .map(|(id, fields)| (*id, fields))
2406            .collect();
2407        runtime
2408            .index_store_ref()
2409            .index_entity_insert_batch(&collection, &index_rows)
2410            .map_err(crate::RedDBError::Internal)?;
2411    }
2412
2413    runtime.invalidate_result_cache();
2414    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2415
2416    Ok(ids
2417        .into_iter()
2418        .map(|id| CreateEntityOutput { id, entity: None })
2419        .collect())
2420}
2421
2422impl RuntimeEntityPort for RedDBRuntime {
2423    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2424        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2425        let db = self.db();
2426        let CreateRowInput {
2427            collection,
2428            fields,
2429            metadata: input_metadata,
2430            node_links,
2431            vector_links,
2432        } = input;
2433        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2434        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2435        let mut metadata = input_metadata;
2436        apply_collection_default_ttl(&db, &collection, &mut metadata);
2437        let fields = contract.normalize_insert_fields(fields)?;
2438        contract.enforce_row_uniqueness(&fields, None)?;
2439        // Route through MutationEngine for unified hot path.
2440        let engine = self.mutation_engine();
2441        let result = engine.apply(
2442            collection.clone(),
2443            vec![crate::runtime::mutation::MutationRow {
2444                fields,
2445                metadata,
2446                node_links,
2447                vector_links,
2448            }],
2449        )?;
2450        let id = result.ids[0];
2451        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2452        // that also takes a write lock on the entity cache. We know
2453        // the collection — hit the manager directly. Cuts
2454        // create_row() p50 roughly in half on the hot path.
2455        Ok(CreateEntityOutput {
2456            id,
2457            entity: db.store().get(&collection, id),
2458        })
2459    }
2460
2461    fn create_rows_batch(
2462        &self,
2463        input: CreateRowsBatchInput,
2464    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2465        if input.rows.is_empty() {
2466            return Ok(Vec::new());
2467        }
2468        self.check_batch_size(input.rows.len())?;
2469        self.check_db_size()?;
2470
2471        let db = self.db();
2472        let collection = input.collection;
2473        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2474        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2475
2476        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2477        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2478        for row in input.rows {
2479            if row.collection != collection {
2480                return Err(crate::RedDBError::Query(format!(
2481                    "batch row collection mismatch: expected '{}', got '{}'",
2482                    collection, row.collection
2483                )));
2484            }
2485
2486            let mut metadata = row.metadata;
2487            apply_collection_default_ttl(&db, &collection, &mut metadata);
2488            let fields = contract.normalize_insert_fields(row.fields)?;
2489            contract.enforce_row_uniqueness(&fields, None)?;
2490            uniqueness_rows.push(fields.clone());
2491            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2492        }
2493
2494        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2495
2496        // Route through MutationEngine: single bulk_insert + one CDC batch
2497        // instead of N separate cdc_emit() calls (each acquires a write lock).
2498        let engine = {
2499            let e = self.mutation_engine();
2500            if input.suppress_events {
2501                e.with_suppress_events()
2502            } else {
2503                e
2504            }
2505        };
2506        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2507            .into_iter()
2508            .map(|(fields, metadata, node_links, vector_links)| {
2509                crate::runtime::mutation::MutationRow {
2510                    fields,
2511                    metadata,
2512                    node_links,
2513                    vector_links,
2514                }
2515            })
2516            .collect();
2517
2518        let result = engine
2519            .apply(collection.clone(), mutation_rows)
2520            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2521
2522        let store = db.store();
2523        Ok(result
2524            .ids
2525            .into_iter()
2526            .map(|id| CreateEntityOutput {
2527                id,
2528                entity: store.get(&collection, id),
2529            })
2530            .collect())
2531    }
2532
2533    fn create_rows_batch_prevalidated_columnar(
2534        &self,
2535        collection: String,
2536        column_names: std::sync::Arc<Vec<String>>,
2537        rows: Vec<Vec<crate::storage::schema::Value>>,
2538    ) -> RedDBResult<usize> {
2539        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2540            .map(|outputs| outputs.len())
2541    }
2542
2543    fn create_rows_batch_columnar(
2544        &self,
2545        collection: String,
2546        column_names: std::sync::Arc<Vec<String>>,
2547        rows: Vec<Vec<crate::storage::schema::Value>>,
2548    ) -> RedDBResult<usize> {
2549        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2550            .map(|outputs| outputs.len())
2551    }
2552
2553    fn create_rows_batch_columnar_with_outputs(
2554        &self,
2555        collection: String,
2556        column_names: std::sync::Arc<Vec<String>>,
2557        rows: Vec<Vec<crate::storage::schema::Value>>,
2558    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2559        if rows.is_empty() {
2560            return Ok(Vec::new());
2561        }
2562        self.check_batch_size(rows.len())?;
2563        self.check_db_size()?;
2564
2565        let db = self.db();
2566        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2567        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2568
2569        // Fast path: when the collection carries no contract (or the
2570        // contract has no declared columns) `normalize_insert_fields`
2571        // is a no-op and `enforce_row_uniqueness` finds no unique
2572        // columns to check. Skip the per-row `(String, Value)` tuple
2573        // materialisation entirely and route through the prevalidated
2574        // columnar kernel — same write, CDC, and index path, just
2575        // without the wasted (String, Value) clones. This is the
2576        // bench `bench_users` shape (no contract declared by the
2577        // adapter's `setup_schema`).
2578        let needs_normalisation = match db.collection_contract(&collection) {
2579            Some(c) => {
2580                c.declared_model == crate::catalog::CollectionModel::Table
2581                    && (!c.declared_columns.is_empty()
2582                        || c.table_def
2583                            .as_ref()
2584                            .map(|t| !t.columns.is_empty())
2585                            .unwrap_or(false))
2586            }
2587            None => false,
2588        };
2589        if !needs_normalisation {
2590            return create_rows_batch_prevalidated_columnar_with_outputs(
2591                self,
2592                collection,
2593                column_names,
2594                rows,
2595            );
2596        }
2597
2598        // Slow path: contract requires per-row normalisation /
2599        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2600        // the columnar layout (this still pays N×ncols `String::clone`
2601        // — but it's deferred out of the wire decoder hot loop and
2602        // happens behind a runtime API, not the per-row decode loop)
2603        // and fall through to the existing `create_rows_batch` path.
2604        let ncols = column_names.len();
2605        let tuple_rows: Vec<CreateRowInput> = rows
2606            .into_iter()
2607            .map(|values| {
2608                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2609                    Vec::with_capacity(ncols);
2610                for (name, value) in column_names.iter().zip(values) {
2611                    fields.push((name.clone(), value));
2612                }
2613                CreateRowInput {
2614                    collection: collection.clone(),
2615                    fields,
2616                    metadata: Vec::new(),
2617                    node_links: Vec::new(),
2618                    vector_links: Vec::new(),
2619                }
2620            })
2621            .collect();
2622        self.create_rows_batch(CreateRowsBatchInput {
2623            collection,
2624            rows: tuple_rows,
2625            suppress_events: false,
2626        })
2627    }
2628
2629    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2630        if input.rows.is_empty() {
2631            return Ok(0);
2632        }
2633        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2634        self.check_batch_size(input.rows.len())?;
2635        self.check_db_size()?;
2636
2637        let db = self.db();
2638        let collection = input.collection;
2639        // Still verify the collection's declared model before we blast
2640        // rows at it — this one-off check is O(1), independent of
2641        // ncols, and catches schema-kind mismatches that the client
2642        // can't always see.
2643        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2644        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2645
2646        // Hoist the per-collection default TTL lookup out of the
2647        // per-row loop — it only depends on the collection, not on
2648        // individual rows, and the old path did one HashMap read per
2649        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2650        // apply to any row whose metadata doesn't already carry a TTL.
2651        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2652
2653        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2654            Vec::with_capacity(input.rows.len());
2655        let mut mutation_rows = mutation_rows;
2656        for row in input.rows {
2657            if row.collection != collection {
2658                return Err(crate::RedDBError::Query(format!(
2659                    "batch row collection mismatch: expected '{}', got '{}'",
2660                    collection, row.collection
2661                )));
2662            }
2663            let mut metadata = row.metadata;
2664            if let Some(ttl) = default_ttl_ms {
2665                if !has_internal_ttl_metadata(&metadata) {
2666                    metadata.push((
2667                        "_ttl_ms".to_string(),
2668                        if ttl <= i64::MAX as u64 {
2669                            MetadataValue::Int(ttl as i64)
2670                        } else {
2671                            MetadataValue::Timestamp(ttl)
2672                        },
2673                    ));
2674                }
2675            }
2676            mutation_rows.push(crate::runtime::mutation::MutationRow {
2677                fields: row.fields,
2678                metadata,
2679                node_links: row.node_links,
2680                vector_links: row.vector_links,
2681            });
2682        }
2683
2684        let engine = self.mutation_engine();
2685        let result = engine
2686            .apply(collection, mutation_rows)
2687            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2688        Ok(result.ids.len())
2689    }
2690
2691    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2692        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2693        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2694            input.properties.iter().map(|(key, _)| key.as_str()),
2695            &format!("node '{}'", input.collection),
2696        )?;
2697        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2698        self.create_node_unchecked(input)
2699    }
2700
2701    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2702        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2703        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2704            input.properties.iter().map(|(key, _)| key.as_str()),
2705            &format!("edge '{}'", input.collection),
2706        )?;
2707        ensure_non_tree_structural_edge_label(&input.label)?;
2708        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2709        self.create_edge_unchecked(input)
2710    }
2711
2712    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2713        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2714        let db = self.db();
2715        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2716        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2717        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2718        let mut metadata = input.metadata;
2719        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2720        let mut builder = db.vector(&input.collection).dense(input.dense);
2721
2722        if let Some(content) = input.content {
2723            builder = builder.content(content);
2724        }
2725
2726        for (key, value) in metadata {
2727            builder = builder.metadata(key, value);
2728        }
2729
2730        if let Some(link_row) = input.link_row {
2731            builder = builder.link_to_table(link_row);
2732        }
2733
2734        if let Some(link_node) = input.link_node {
2735            builder = builder.link_to_node(link_node);
2736        }
2737
2738        let id = builder.save()?;
2739        let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2740            Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2741            _ => None,
2742        };
2743        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2744        // concurrent ANN scans hide it until the transaction commits.
2745        self.stamp_xmin_if_in_txn(&input.collection, id);
2746        refresh_context_index(&db, &input.collection, id)?;
2747        // Issue #693 — vector.turbo write path. Order matters and
2748        // matches the write sequence the next slice (#673) will
2749        // attach kill-points to: WAL append → in-memory index update
2750        // → TurboExtent append. The legacy `vector` path is the
2751        // `None` branch and is untouched.
2752        if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2753            // Lazy populate so an INSERT after restart sees the
2754            // pre-restart vectors in the index before appending its
2755            // own.
2756            state.ensure_populated(&db.store(), &input.collection);
2757            // Issue #694 — named crash boundaries for #673.
2758            use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2759            fire(InjectionPoint::BeforeWalFsync);
2760            let _ = db
2761                .store()
2762                .append_vector_insert_record(&input.collection, id.raw(), &dense);
2763            fire(InjectionPoint::BeforeIndexCommit);
2764            let mut index = state.index.lock();
2765            index.insert(id, dense.clone());
2766            // Mirror the encoded codes into the per-collection
2767            // TurboExtent when one is available. The bytes are the
2768            // 4-bit-packed code groups (`n_byte_groups`) plus the
2769            // little-endian L2 norm — same shape `BlockedCodeStorage`
2770            // appends in-memory. Failures are non-fatal in this
2771            // slice; durability of the extent is owned by #673.
2772            fire(InjectionPoint::BeforeExtentFsync);
2773            if let Some(extent) = state.extent.lock().as_mut() {
2774                let packed = index.encode_for_extent(&dense);
2775                let _ = extent.append(&packed);
2776            }
2777        }
2778        self.cdc_emit(
2779            crate::replication::cdc::ChangeOperation::Insert,
2780            &input.collection,
2781            id.raw(),
2782            "vector",
2783        );
2784        Ok(CreateEntityOutput {
2785            id,
2786            entity: db.store().get(&input.collection, id),
2787        })
2788    }
2789
2790    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2791        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2792        let db = self.db();
2793        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2794        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2795
2796        if let JsonValue::Object(ref map) = input.body {
2797            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2798                map.keys().map(String::as_str),
2799                &format!("document '{}'", input.collection),
2800            )?;
2801        }
2802
2803        // Serialize the full body as Value::Json for the "body" field
2804        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2805            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2806        })?;
2807        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2808            "body".to_string(),
2809            crate::storage::schema::Value::Json(body_bytes),
2810        )];
2811
2812        // Flatten top-level keys from the body into named fields for filtering
2813        if let JsonValue::Object(ref map) = input.body {
2814            for (key, value) in map {
2815                let storage_value = json_to_storage_value(value)?;
2816                fields.push((key.clone(), storage_value));
2817            }
2818        }
2819
2820        let mut metadata = input.metadata;
2821        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2822        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2823            .iter()
2824            .map(|(key, value)| (key.as_str(), value.clone()))
2825            .collect();
2826        let mut builder = db.row(&input.collection, columns);
2827
2828        for (key, value) in metadata {
2829            builder = builder.metadata(key, value);
2830        }
2831
2832        for node in input.node_links {
2833            builder = builder.link_to_node(node);
2834        }
2835
2836        for vector in input.vector_links {
2837            builder = builder.link_to_vector(vector);
2838        }
2839
2840        let id = builder.save()?;
2841        // Phase 1.1 MVCC universal: stamp xmin on the document.
2842        self.stamp_xmin_if_in_txn(&input.collection, id);
2843        refresh_context_index(&db, &input.collection, id)?;
2844        self.cdc_emit(
2845            crate::replication::cdc::ChangeOperation::Insert,
2846            &input.collection,
2847            id.raw(),
2848            "document",
2849        );
2850        Ok(CreateEntityOutput {
2851            id,
2852            entity: db.store().get(&input.collection, id),
2853        })
2854    }
2855
2856    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2857        let db = self.db();
2858        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2859        let declared_model = db
2860            .collection_contract(&input.collection)
2861            .map(|contract| contract.declared_model);
2862        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2863            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2864            self.seal_vault_value(&input.collection, input.value)?
2865        } else {
2866            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2867            input.value
2868        };
2869        let fields = vec![
2870            (
2871                "key".to_string(),
2872                crate::storage::schema::Value::text(input.key),
2873            ),
2874            ("value".to_string(), value),
2875        ];
2876        let collection = input.collection;
2877        let result = self.mutation_engine().apply(
2878            collection.clone(),
2879            vec![crate::runtime::mutation::MutationRow {
2880                fields,
2881                metadata: input.metadata,
2882                node_links: Vec::new(),
2883                vector_links: Vec::new(),
2884            }],
2885        )?;
2886        let id = result.ids[0];
2887        Ok(CreateEntityOutput {
2888            id,
2889            entity: db.store().get(&collection, id),
2890        })
2891    }
2892
2893    fn create_timeseries_point(
2894        &self,
2895        input: CreateTimeSeriesPointInput,
2896    ) -> RedDBResult<CreateEntityOutput> {
2897        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2898        let db = self.db();
2899        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2900        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2901
2902        let mut fields = vec![
2903            (
2904                "metric".to_string(),
2905                crate::storage::schema::Value::text(input.metric),
2906            ),
2907            (
2908                "value".to_string(),
2909                crate::storage::schema::Value::Float(input.value),
2910            ),
2911        ];
2912
2913        if let Some(timestamp_ns) = input.timestamp_ns {
2914            fields.push((
2915                "timestamp".to_string(),
2916                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2917            ));
2918        }
2919
2920        if !input.tags.is_empty() {
2921            let tags_json = JsonValue::Object(
2922                input
2923                    .tags
2924                    .into_iter()
2925                    .map(|(key, value)| (key, JsonValue::String(value)))
2926                    .collect(),
2927            );
2928            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2929                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2930            })?;
2931            fields.push((
2932                "tags".to_string(),
2933                crate::storage::schema::Value::Json(tags_bytes),
2934            ));
2935        }
2936
2937        let collection = input.collection;
2938        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2939        // Phase 1.1 MVCC universal: stamp xmin on the point so
2940        // concurrent range scans hide it until COMMIT.
2941        self.stamp_xmin_if_in_txn(&collection, id);
2942        refresh_context_index(&db, &collection, id)?;
2943
2944        Ok(CreateEntityOutput {
2945            id,
2946            entity: db.store().get(&collection, id),
2947        })
2948    }
2949
2950    fn get_kv(
2951        &self,
2952        collection: &str,
2953        key: &str,
2954    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2955        let db = self.db();
2956        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2957        let store = db.store();
2958        let Some(manager) = store.get_collection(collection) else {
2959            return Ok(None);
2960        };
2961        let entities = manager.query_all(|_| true);
2962        for entity in entities {
2963            if let crate::storage::EntityData::Row(ref row) = entity.data {
2964                if let Some(ref named) = row.named {
2965                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2966                        if &**k == key {
2967                            let value = named
2968                                .get("value")
2969                                .cloned()
2970                                .unwrap_or(crate::storage::schema::Value::Null);
2971                            return Ok(Some((value, entity.id)));
2972                        }
2973                    }
2974                }
2975            }
2976        }
2977        Ok(None)
2978    }
2979
2980    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2981        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2982        let found = self.get_kv(collection, key)?;
2983        if let Some((_, id)) = found {
2984            let db = self.db();
2985            let store = db.store();
2986            let deleted = store
2987                .delete(collection, id)
2988                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2989            if deleted {
2990                store.context_index().remove_entity(id);
2991            }
2992            Ok(deleted)
2993        } else {
2994            Ok(false)
2995        }
2996    }
2997
2998    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2999        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3000        let PatchEntityInput {
3001            collection,
3002            id,
3003            payload,
3004            operations,
3005        } = input;
3006        let db = self.db();
3007        let store = db.store();
3008        let Some(manager) = store.get_collection(&collection) else {
3009            return Err(crate::RedDBError::NotFound(format!(
3010                "collection not found: {collection}"
3011            )));
3012        };
3013        let Some(entity) = manager.get(id) else {
3014            return Err(crate::RedDBError::NotFound(format!(
3015                "entity not found: {}",
3016                id.raw()
3017            )));
3018        };
3019        self.apply_loaded_patch_entity(collection, entity, payload, operations)
3020    }
3021
3022    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3023        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3024        let store = self.db().store();
3025        // Snapshot row fields before delete so we can mirror the removal
3026        // into every secondary index. The fetch is best-effort: if the
3027        // entity is already gone, the delete below is a no-op anyway.
3028        let pre_delete_fields = store
3029            .get(&input.collection, input.id)
3030            .as_ref()
3031            .map(entity_row_fields_snapshot)
3032            .unwrap_or_default();
3033        // Store delete first (source of truth). Crash between here and index removal
3034        // leaves the entity invisible to most queries but recoverable; the reverse
3035        // (remove index first, then crash) leaves the entity permanently orphaned.
3036        let deleted = store
3037            .delete(&input.collection, input.id)
3038            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3039        if deleted {
3040            store.context_index().remove_entity(input.id);
3041            // Secondary index maintenance — surface only registry-shape
3042            // errors; missing-index removals are tolerated inside the call.
3043            if !pre_delete_fields.is_empty() {
3044                self.index_store_ref()
3045                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3046                    .map_err(crate::RedDBError::Internal)?;
3047            }
3048            self.cdc_emit(
3049                crate::replication::cdc::ChangeOperation::Delete,
3050                &input.collection,
3051                input.id.raw(),
3052                "entity",
3053            );
3054        }
3055        Ok(DeleteEntityOutput {
3056            deleted,
3057            id: input.id,
3058        })
3059    }
3060}