Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        vector_dimension: None,
112        vector_metric: None,
113        context_index_fields: Vec::new(),
114        declared_columns: Vec::new(),
115        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117        timestamps_enabled: false,
118        context_index_enabled: false,
119        metrics_raw_retention_ms: None,
120        metrics_rollup_policies: Vec::new(),
121        metrics_tenant_identity: None,
122        metrics_namespace: None,
123        // Implicit contracts are created on first write — mutability
124        // is the default until the operator runs explicit DDL.
125        append_only: false,
126        subscriptions: Vec::new(),
127        analytics_config: Vec::new(),
128        session_key: None,
129        session_gap_ms: None,
130        retention_duration_ms: None,
131        analytical_storage: None,
132    })
133    .map(|_| ())
134    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
135}
136
137fn implicit_contract_unix_ms() -> u128 {
138    std::time::SystemTime::now()
139        .duration_since(std::time::UNIX_EPOCH)
140        .unwrap_or_default()
141        .as_millis()
142}
143
144fn collection_model_allows(
145    declared_model: crate::catalog::CollectionModel,
146    requested_model: crate::catalog::CollectionModel,
147) -> bool {
148    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
149}
150
151fn ensure_vector_dimension_contract(
152    db: &crate::storage::unified::devx::RedDB,
153    collection: &str,
154    actual_dimension: usize,
155) -> RedDBResult<()> {
156    let Some(expected_dimension) = db
157        .collection_contract(collection)
158        .and_then(|contract| contract.vector_dimension)
159    else {
160        return Ok(());
161    };
162    if expected_dimension == actual_dimension {
163        return Ok(());
164    }
165    Err(crate::RedDBError::Query(format!(
166        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
167    )))
168}
169
170fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
171    match model {
172        crate::catalog::CollectionModel::Table => "table",
173        crate::catalog::CollectionModel::Document => "document",
174        crate::catalog::CollectionModel::Graph => "graph",
175        crate::catalog::CollectionModel::Vector => "vector",
176        crate::catalog::CollectionModel::Hll => "hll",
177        crate::catalog::CollectionModel::Sketch => "sketch",
178        crate::catalog::CollectionModel::Filter => "filter",
179        crate::catalog::CollectionModel::Kv => "kv",
180        crate::catalog::CollectionModel::Config => "config",
181        crate::catalog::CollectionModel::Vault => "vault",
182        crate::catalog::CollectionModel::Mixed => "mixed",
183        crate::catalog::CollectionModel::TimeSeries => "timeseries",
184        crate::catalog::CollectionModel::Queue => "queue",
185        crate::catalog::CollectionModel::Metrics => "metrics",
186    }
187}
188
189#[derive(Clone)]
190struct UniquenessRule {
191    name: String,
192    columns: Vec<String>,
193    primary_key: bool,
194}
195
196#[derive(Copy, Clone, PartialEq, Eq)]
197enum NormalizeMode {
198    /// First write for this row. Timestamps auto-filled from now on
199    /// both `created_at` and `updated_at`; user attempts to set
200    /// either column are rejected.
201    Insert,
202    /// Update/patch path. `created_at` is preserved from the existing
203    /// row (immutable after insert); `updated_at` is bumped to now.
204    /// User attempts to set either via the patch are rejected.
205    Update,
206}
207
208mod collection_contract_enforcement {
209    use super::*;
210
211    pub(super) struct CollectionContractWriteEnforcer<'a> {
212        db: &'a crate::storage::unified::devx::RedDB,
213        collection: &'a str,
214    }
215
216    impl<'a> CollectionContractWriteEnforcer<'a> {
217        pub(super) fn new(
218            db: &'a crate::storage::unified::devx::RedDB,
219            collection: &'a str,
220        ) -> Self {
221            Self { db, collection }
222        }
223
224        pub(super) fn ensure_model(
225            &self,
226            requested_model: crate::catalog::CollectionModel,
227        ) -> RedDBResult<()> {
228            ensure_collection_model_contract(self.db, self.collection, requested_model)
229        }
230
231        pub(super) fn normalize_insert_fields(
232            &self,
233            fields: Vec<(String, Value)>,
234        ) -> RedDBResult<Vec<(String, Value)>> {
235            normalize_row_fields_for_contract_with_mode(
236                self.db,
237                self.collection,
238                fields,
239                NormalizeMode::Insert,
240            )
241        }
242
243        pub(super) fn normalize_update_fields(
244            &self,
245            fields: Vec<(String, Value)>,
246        ) -> RedDBResult<Vec<(String, Value)>> {
247            normalize_row_fields_for_contract_with_mode(
248                self.db,
249                self.collection,
250                fields,
251                NormalizeMode::Update,
252            )
253        }
254
255        pub(super) fn enforce_row_uniqueness(
256            &self,
257            fields: &[(String, Value)],
258            exclude_id: Option<crate::storage::EntityId>,
259        ) -> RedDBResult<()> {
260            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
261        }
262
263        pub(super) fn enforce_batch_uniqueness(
264            &self,
265            rows: &[Vec<(String, Value)>],
266        ) -> RedDBResult<()> {
267            enforce_row_batch_uniqueness(self.db, self.collection, rows)
268        }
269
270        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
271            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
272        }
273    }
274}
275
276use collection_contract_enforcement::CollectionContractWriteEnforcer;
277
278fn normalize_row_fields_for_contract_with_mode(
279    db: &crate::storage::unified::devx::RedDB,
280    collection: &str,
281    fields: Vec<(String, Value)>,
282    mode: NormalizeMode,
283) -> RedDBResult<Vec<(String, Value)>> {
284    let Some(contract) = db.collection_contract(collection) else {
285        return Ok(fields);
286    };
287
288    if contract.declared_model != crate::catalog::CollectionModel::Table
289        || (contract.declared_columns.is_empty()
290            && contract
291                .table_def
292                .as_ref()
293                .map(|table| table.columns.is_empty())
294                .unwrap_or(true))
295    {
296        return Ok(fields);
297    }
298
299    // Capture the pre-normalize value of created_at (if present) so
300    // Update mode can preserve it. Also capture updated_at to detect
301    // user attempts to set it via the patch payload.
302    //
303    // Heuristic for Update mode: if fields ALREADY contains a
304    // `created_at` whose value matches the row's on-disk entity, the
305    // caller is the patch pipeline carrying forward an auto-populated
306    // column — not a user mutation. Allow pass-through in that case,
307    // then restore the original value at the end.
308    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
309        fields
310            .iter()
311            .find(|(n, _)| n == "created_at")
312            .map(|(_, v)| v.clone())
313    } else {
314        None
315    };
316
317    // Reject user attempts to set runtime-managed timestamp columns.
318    // On Insert we reject any mention; on Update we only reject when
319    // the patch pipeline handed us a NEW value (not the one we
320    // auto-populated during the last insert).
321    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
322        for (name, _) in &fields {
323            if name == "created_at" || name == "updated_at" {
324                return Err(crate::RedDBError::Query(format!(
325                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
326                    collection, name
327                )));
328            }
329        }
330    }
331
332    let mut provided = std::collections::BTreeMap::new();
333    for (name, value) in &fields {
334        provided.insert(name.clone(), value.clone());
335    }
336
337    let resolved_columns = resolved_contract_columns(&contract)?;
338    let declared_names: std::collections::BTreeSet<String> = resolved_columns
339        .iter()
340        .map(|column| column.name.clone())
341        .collect();
342    let unknown_fields: Vec<String> = fields
343        .iter()
344        .filter(|(name, _)| !declared_names.contains(name))
345        .map(|(name, _)| name.clone())
346        .collect();
347    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
348        && !unknown_fields.is_empty()
349    {
350        return Err(crate::RedDBError::Query(format!(
351            "collection '{}' is strict and does not allow undeclared fields: {}",
352            collection,
353            unknown_fields.join(", ")
354        )));
355    }
356    let mut normalized = Vec::new();
357    let now_ms = current_unix_ms_u64();
358
359    for column in &resolved_columns {
360        match provided.remove(&column.name) {
361            Some(value) => {
362                // Runtime-managed columns on Update: always overwrite
363                // with the runtime's own value (preserved created_at
364                // or fresh updated_at). User mutations are silently
365                // discarded because we reject them earlier.
366                if contract.timestamps_enabled && mode == NormalizeMode::Update {
367                    match column.name.as_str() {
368                        "created_at" => {
369                            normalized.push((
370                                column.name.clone(),
371                                existing_created_at
372                                    .clone()
373                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
374                            ));
375                            continue;
376                        }
377                        "updated_at" => {
378                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
379                            continue;
380                        }
381                        _ => {}
382                    }
383                }
384                normalized.push((
385                    column.name.clone(),
386                    normalize_contract_value(collection, column, value)?,
387                ));
388            }
389            None => {
390                // Runtime-managed timestamp columns: auto-fill with now
391                // when the contract opted in. Both get the same value on
392                // first insert so callers can order by either.
393                if contract.timestamps_enabled
394                    && (column.name == "created_at" || column.name == "updated_at")
395                {
396                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
397                    continue;
398                }
399                if let Some(default) = &column.default {
400                    normalized.push((
401                        column.name.clone(),
402                        coerce_contract_literal(collection, &column.name, column, default)?,
403                    ));
404                } else if column.not_null {
405                    return Err(crate::RedDBError::Query(format!(
406                        "missing required column '{}' for collection '{}'",
407                        column.name, collection
408                    )));
409                }
410            }
411        }
412    }
413
414    for (name, value) in fields {
415        if !declared_names.contains(&name) {
416            normalized.push((name, value));
417        }
418    }
419
420    Ok(normalized)
421}
422
423fn current_unix_ms_u64() -> u64 {
424    std::time::SystemTime::now()
425        .duration_since(std::time::UNIX_EPOCH)
426        .map(|d| d.as_millis() as u64)
427        .unwrap_or(0)
428}
429
430fn enforce_row_uniqueness(
431    db: &crate::storage::unified::devx::RedDB,
432    collection: &str,
433    fields: &[(String, Value)],
434    exclude_id: Option<crate::storage::EntityId>,
435) -> RedDBResult<()> {
436    let Some(contract) = db.collection_contract(collection) else {
437        return Ok(());
438    };
439    if contract.declared_model != crate::catalog::CollectionModel::Table
440        && contract.declared_model != crate::catalog::CollectionModel::Mixed
441    {
442        return Ok(());
443    }
444
445    let rules = resolved_uniqueness_rules(&contract);
446    if rules.is_empty() {
447        return Ok(());
448    }
449
450    let store = db.store();
451    let Some(manager) = store.get_collection(collection) else {
452        return Ok(());
453    };
454
455    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
456
457    for rule in &rules {
458        let mut expected_signatures = Vec::new();
459        let mut skip_rule = false;
460
461        for column in &rule.columns {
462            match input_fields.get(column) {
463                Some(Value::Null) | None if rule.primary_key => {
464                    return Err(crate::RedDBError::Query(format!(
465                        "primary key '{}' in collection '{}' requires non-null column '{}'",
466                        rule.name, collection, column
467                    )))
468                }
469                Some(Value::Null) | None => {
470                    skip_rule = true;
471                    break;
472                }
473                Some(value) => {
474                    expected_signatures.push((column.clone(), value_signature(value)));
475                }
476            }
477        }
478
479        if skip_rule {
480            continue;
481        }
482
483        for entity in manager.query_all(|_| true) {
484            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
485                continue;
486            }
487            let Some(existing_fields) = row_fields_from_entity(&entity) else {
488                continue;
489            };
490
491            let duplicate = expected_signatures.iter().all(|(column, expected)| {
492                existing_fields
493                    .get(column)
494                    .map(|value| value_signature(value) == *expected)
495                    .unwrap_or(false)
496            });
497
498            if duplicate {
499                let qualifier = if rule.primary_key {
500                    "primary key"
501                } else {
502                    "unique constraint"
503                };
504                return Err(crate::RedDBError::Query(format!(
505                    "{} '{}' violated on collection '{}' for columns [{}]",
506                    qualifier,
507                    rule.name,
508                    collection,
509                    rule.columns.join(", ")
510                )));
511            }
512        }
513    }
514
515    Ok(())
516}
517
518fn enforce_row_batch_uniqueness(
519    db: &crate::storage::unified::devx::RedDB,
520    collection: &str,
521    rows: &[Vec<(String, Value)>],
522) -> RedDBResult<()> {
523    let Some(contract) = db.collection_contract(collection) else {
524        return Ok(());
525    };
526    if contract.declared_model != crate::catalog::CollectionModel::Table
527        && contract.declared_model != crate::catalog::CollectionModel::Mixed
528    {
529        return Ok(());
530    }
531
532    let rules = resolved_uniqueness_rules(&contract);
533    if rules.is_empty() {
534        return Ok(());
535    }
536
537    for rule in &rules {
538        let mut seen = std::collections::HashMap::<String, usize>::new();
539        for (row_index, fields) in rows.iter().enumerate() {
540            let input_fields: std::collections::BTreeMap<String, Value> =
541                fields.iter().cloned().collect();
542            let mut signatures = Vec::new();
543            let mut skip_rule = false;
544
545            for column in &rule.columns {
546                match input_fields.get(column) {
547                    Some(Value::Null) | None if rule.primary_key => {
548                        return Err(crate::RedDBError::Query(format!(
549                            "primary key '{}' in collection '{}' requires non-null column '{}'",
550                            rule.name, collection, column
551                        )))
552                    }
553                    Some(Value::Null) | None => {
554                        skip_rule = true;
555                        break;
556                    }
557                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
558                }
559            }
560
561            if skip_rule {
562                continue;
563            }
564
565            let signature = signatures.join("|");
566            if let Some(previous_index) = seen.insert(signature, row_index) {
567                return Err(crate::RedDBError::Query(format!(
568                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
569                    rule.name,
570                    collection,
571                    previous_index + 1,
572                    row_index + 1
573                )));
574            }
575        }
576    }
577
578    Ok(())
579}
580
581fn row_update_requires_uniqueness_check(
582    db: &crate::storage::unified::devx::RedDB,
583    collection: &str,
584    modified_columns: &[String],
585) -> bool {
586    if modified_columns.is_empty() {
587        return false;
588    }
589
590    let Some(contract) = db.collection_contract(collection) else {
591        return false;
592    };
593    if contract.declared_model != crate::catalog::CollectionModel::Table
594        && contract.declared_model != crate::catalog::CollectionModel::Mixed
595    {
596        return false;
597    }
598
599    let rules = resolved_uniqueness_rules(&contract);
600    if rules.is_empty() {
601        return false;
602    }
603
604    rules.iter().any(|rule| {
605        rule.columns.iter().any(|column| {
606            modified_columns
607                .iter()
608                .any(|modified| modified.eq_ignore_ascii_case(column))
609        })
610    })
611}
612
613pub(crate) fn build_row_update_contract_plan(
614    db: &crate::storage::unified::devx::RedDB,
615    collection: &str,
616) -> RedDBResult<Option<RowUpdateContractPlan>> {
617    let Some(contract) = db.collection_contract(collection) else {
618        return Ok(None);
619    };
620
621    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
622        && !(contract.declared_columns.is_empty()
623            && contract
624                .table_def
625                .as_ref()
626                .map(|table| table.columns.is_empty())
627                .unwrap_or(true))
628    {
629        resolved_contract_columns(&contract)?
630            .into_iter()
631            .map(|rule| {
632                (
633                    rule.name.clone(),
634                    RowUpdateColumnRule {
635                        name: rule.name,
636                        data_type: rule.data_type,
637                        data_type_name: rule.data_type_name,
638                        not_null: rule.not_null,
639                        enum_variants: rule.enum_variants,
640                    },
641                )
642            })
643            .collect()
644    } else {
645        HashMap::new()
646    };
647
648    let unique_columns = if matches!(
649        contract.declared_model,
650        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
651    ) {
652        resolved_uniqueness_rules(&contract)
653            .into_iter()
654            .flat_map(|rule| rule.columns.into_iter())
655            .map(|column| (column, ()))
656            .collect()
657    } else {
658        HashMap::new()
659    };
660
661    Ok(Some(RowUpdateContractPlan {
662        timestamps_enabled: contract.timestamps_enabled,
663        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
664        declared_rules,
665        unique_columns,
666    }))
667}
668
669pub(crate) fn normalize_row_update_assignment_with_plan(
670    collection: &str,
671    column: &str,
672    value: Value,
673    row_contract_plan: Option<&RowUpdateContractPlan>,
674) -> RedDBResult<Value> {
675    let Some(plan) = row_contract_plan else {
676        return Ok(value);
677    };
678
679    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
680        return Err(crate::RedDBError::Query(format!(
681            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
682            collection, column
683        )));
684    }
685
686    if let Some(rule) = plan.declared_rules.get(column) {
687        let rule = ResolvedColumnRule {
688            name: rule.name.clone(),
689            data_type: rule.data_type,
690            data_type_name: rule.data_type_name.clone(),
691            not_null: rule.not_null,
692            default: None,
693            enum_variants: rule.enum_variants.clone(),
694        };
695        normalize_contract_value(collection, &rule, value)
696    } else if plan.strict_schema {
697        Err(crate::RedDBError::Query(format!(
698            "collection '{}' is strict and does not allow undeclared fields: {}",
699            collection, column
700        )))
701    } else {
702        Ok(value)
703    }
704}
705
706pub(crate) fn normalize_row_update_value_for_rule(
707    collection: &str,
708    value: Value,
709    row_rule: Option<&RowUpdateColumnRule>,
710) -> RedDBResult<Value> {
711    let Some(rule) = row_rule else {
712        return Ok(value);
713    };
714
715    let rule = ResolvedColumnRule {
716        name: rule.name.clone(),
717        data_type: rule.data_type,
718        data_type_name: rule.data_type_name.clone(),
719        not_null: rule.not_null,
720        default: None,
721        enum_variants: rule.enum_variants.clone(),
722    };
723    normalize_contract_value(collection, &rule, value)
724}
725
726fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
727    if let Some(named) = row.named.as_mut() {
728        named.insert(name.to_string(), value);
729        return;
730    }
731
732    if let Some(schema) = row.schema.as_ref() {
733        if let Some(index) = schema.iter().position(|column| column == name) {
734            if let Some(slot) = row.columns.get_mut(index) {
735                *slot = value;
736                return;
737            }
738        }
739
740        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
741        for (column, current) in schema.iter().zip(row.columns.iter()) {
742            named.insert(column.clone(), current.clone());
743        }
744        named.insert(name.to_string(), value);
745        row.named = Some(named);
746        return;
747    }
748
749    let mut named = HashMap::with_capacity(1);
750    named.insert(name.to_string(), value);
751    row.named = Some(named);
752}
753
754fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
755    if let Some(named) = row.named.as_ref() {
756        named
757            .iter()
758            .map(|(key, value)| (key.clone(), value.clone()))
759            .collect()
760    } else if let Some(schema) = row.schema.as_ref() {
761        schema
762            .iter()
763            .cloned()
764            .zip(row.columns.iter().cloned())
765            .collect()
766    } else {
767        Vec::new()
768    }
769}
770
771fn apply_row_field_assignments_raw<I>(
772    row: &mut crate::storage::unified::entity::RowData,
773    field_assignments: I,
774) where
775    I: IntoIterator<Item = (String, Value)>,
776{
777    for (column, value) in field_assignments {
778        set_row_field(row, &column, value);
779    }
780}
781
782fn apply_row_field_assignments_incremental<I>(
783    collection: &str,
784    row: &mut crate::storage::unified::entity::RowData,
785    field_assignments: I,
786    row_contract_plan: Option<&RowUpdateContractPlan>,
787) -> RedDBResult<()>
788where
789    I: IntoIterator<Item = (String, Value)>,
790{
791    for (column, value) in field_assignments {
792        let value = normalize_row_update_assignment_with_plan(
793            collection,
794            &column,
795            value,
796            row_contract_plan,
797        )?;
798
799        set_row_field(row, &column, value);
800    }
801
802    Ok(())
803}
804
805fn resolved_uniqueness_rules(
806    contract: &crate::physical::CollectionContract,
807) -> Vec<UniquenessRule> {
808    let mut rules = Vec::new();
809
810    if let Some(table_def) = &contract.table_def {
811        if !table_def.primary_key.is_empty() {
812            rules.push(UniquenessRule {
813                name: "primary_key".to_string(),
814                columns: table_def.primary_key.clone(),
815                primary_key: true,
816            });
817        }
818
819        for constraint in &table_def.constraints {
820            if matches!(
821                constraint.constraint_type,
822                crate::storage::schema::ConstraintType::PrimaryKey
823            ) && !constraint.columns.is_empty()
824            {
825                rules.push(UniquenessRule {
826                    name: constraint.name.clone(),
827                    columns: constraint.columns.clone(),
828                    primary_key: true,
829                });
830            } else if matches!(
831                constraint.constraint_type,
832                crate::storage::schema::ConstraintType::Unique
833            ) && !constraint.columns.is_empty()
834            {
835                rules.push(UniquenessRule {
836                    name: constraint.name.clone(),
837                    columns: constraint.columns.clone(),
838                    primary_key: false,
839                });
840            }
841        }
842    } else {
843        for column in &contract.declared_columns {
844            if column.primary_key {
845                rules.push(UniquenessRule {
846                    name: format!("pk_{}", column.name),
847                    columns: vec![column.name.clone()],
848                    primary_key: true,
849                });
850            } else if column.unique {
851                rules.push(UniquenessRule {
852                    name: format!("uniq_{}", column.name),
853                    columns: vec![column.name.clone()],
854                    primary_key: false,
855                });
856            }
857        }
858    }
859
860    let mut dedup = std::collections::BTreeSet::new();
861    rules
862        .into_iter()
863        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
864        .collect()
865}
866
867fn row_fields_from_entity(
868    entity: &crate::storage::UnifiedEntity,
869) -> Option<std::collections::BTreeMap<String, Value>> {
870    match &entity.data {
871        crate::storage::EntityData::Row(row) => {
872            if let Some(named) = &row.named {
873                Some(
874                    named
875                        .iter()
876                        .map(|(key, value)| (key.clone(), value.clone()))
877                        .collect(),
878                )
879            } else {
880                row.schema.as_ref().map(|schema| {
881                    schema
882                        .iter()
883                        .cloned()
884                        .zip(row.columns.iter().cloned())
885                        .collect()
886                })
887            }
888        }
889        _ => None,
890    }
891}
892
893fn value_signature(value: &Value) -> String {
894    format!("{value:?}")
895}
896
897fn normalize_contract_value(
898    collection: &str,
899    column: &ResolvedColumnRule,
900    value: Value,
901) -> RedDBResult<Value> {
902    if matches!(value, Value::Null) {
903        if column.not_null {
904            return Err(crate::RedDBError::Query(format!(
905                "column '{}' in collection '{}' cannot be null",
906                column.name, collection
907            )));
908        }
909        return Ok(Value::Null);
910    }
911
912    let target = column.data_type;
913    if value_matches_declared_type(&value, target) {
914        return Ok(value);
915    }
916
917    let Some(raw) = value_to_coercion_input(&value) else {
918        return Err(crate::RedDBError::Query(format!(
919            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
920            column.name, collection, column.data_type_name
921        )));
922    };
923
924    coerce_contract_literal(collection, &column.name, column, &raw)
925}
926
927fn coerce_contract_literal(
928    collection: &str,
929    column_name: &str,
930    column: &ResolvedColumnRule,
931    raw: &str,
932) -> RedDBResult<Value> {
933    let target = column.data_type;
934    match target {
935        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
936        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
937        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
938            crate::RedDBError::Query(format!(
939                "failed to coerce column '{}' in collection '{}' to '{}': {}",
940                column_name, collection, column.data_type_name, err
941            ))
942        }),
943        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
944            crate::RedDBError::Query(format!(
945                "failed to coerce column '{}' in collection '{}' to '{}': {}",
946                column_name, collection, column.data_type_name, err
947            ))
948        }),
949        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
950            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
951            column_name, collection, column.data_type_name
952        ))),
953        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
954            |err| {
955                crate::RedDBError::Query(format!(
956                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
957                    column_name, collection, column.data_type_name, err
958                ))
959            },
960        ),
961    }
962}
963
964struct ResolvedColumnRule {
965    name: String,
966    data_type: DataType,
967    data_type_name: String,
968    not_null: bool,
969    default: Option<String>,
970    enum_variants: Vec<String>,
971}
972
973fn resolved_contract_columns(
974    contract: &crate::physical::CollectionContract,
975) -> RedDBResult<Vec<ResolvedColumnRule>> {
976    if let Some(table_def) = &contract.table_def {
977        return Ok(table_def
978            .columns
979            .iter()
980            .map(|column| ResolvedColumnRule {
981                name: column.name.clone(),
982                data_type: column.data_type,
983                data_type_name: data_type_name(column.data_type).to_string(),
984                not_null: !column.nullable,
985                default: column
986                    .default
987                    .as_ref()
988                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
989                enum_variants: column.enum_variants.clone(),
990            })
991            .collect());
992    }
993
994    contract
995        .declared_columns
996        .iter()
997        .map(|column| {
998            let data_type = column
999                .sql_type
1000                .as_ref()
1001                .map(crate::storage::query::resolve_sql_type_name)
1002                .transpose()
1003                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1004                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1005            Ok(ResolvedColumnRule {
1006                name: column.name.clone(),
1007                data_type,
1008                data_type_name: column.data_type.clone(),
1009                not_null: column.not_null,
1010                default: column.default.clone(),
1011                enum_variants: column.enum_variants.clone(),
1012            })
1013        })
1014        .collect()
1015}
1016
1017fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1018    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1019}
1020
1021fn data_type_name(data_type: DataType) -> &'static str {
1022    match data_type {
1023        DataType::Integer => "integer",
1024        DataType::UnsignedInteger => "unsigned_integer",
1025        DataType::Float => "float",
1026        DataType::Text => "text",
1027        DataType::Blob => "blob",
1028        DataType::Boolean => "boolean",
1029        DataType::Timestamp => "timestamp",
1030        DataType::Duration => "duration",
1031        DataType::IpAddr => "ipaddr",
1032        DataType::MacAddr => "macaddr",
1033        DataType::Vector => "vector",
1034        DataType::Nullable => "nullable",
1035        DataType::Unknown => "unknown",
1036        DataType::Json => "json",
1037        DataType::Uuid => "uuid",
1038        DataType::NodeRef => "noderef",
1039        DataType::EdgeRef => "edgeref",
1040        DataType::VectorRef => "vectorref",
1041        DataType::RowRef => "rowref",
1042        DataType::Color => "color",
1043        DataType::Email => "email",
1044        DataType::Url => "url",
1045        DataType::Phone => "phone",
1046        DataType::Semver => "semver",
1047        DataType::Cidr => "cidr",
1048        DataType::Date => "date",
1049        DataType::Time => "time",
1050        DataType::Decimal => "decimal",
1051        DataType::Enum => "enum",
1052        DataType::Array => "array",
1053        DataType::TimestampMs => "timestamp_ms",
1054        DataType::Ipv4 => "ipv4",
1055        DataType::Ipv6 => "ipv6",
1056        DataType::Subnet => "subnet",
1057        DataType::Port => "port",
1058        DataType::Latitude => "latitude",
1059        DataType::Longitude => "longitude",
1060        DataType::GeoPoint => "geopoint",
1061        DataType::Country2 => "country2",
1062        DataType::Country3 => "country3",
1063        DataType::Lang2 => "lang2",
1064        DataType::Lang5 => "lang5",
1065        DataType::Currency => "currency",
1066        DataType::AssetCode => "asset_code",
1067        DataType::Money => "money",
1068        DataType::ColorAlpha => "color_alpha",
1069        DataType::BigInt => "bigint",
1070        DataType::KeyRef => "keyref",
1071        DataType::DocRef => "docref",
1072        DataType::TableRef => "tableref",
1073        DataType::PageRef => "pageref",
1074        DataType::Secret => "secret",
1075        DataType::Password => "password",
1076        DataType::TextZstd => "text",
1077        DataType::BlobZstd => "blob",
1078    }
1079}
1080
1081fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1082    matches!(
1083        (value, target),
1084        (Value::Null, _)
1085            | (Value::Integer(_), DataType::Integer)
1086            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1087            | (Value::Float(_), DataType::Float)
1088            | (Value::Text(_), DataType::Text)
1089            | (Value::Blob(_), DataType::Blob)
1090            | (Value::Boolean(_), DataType::Boolean)
1091            | (Value::Timestamp(_), DataType::Timestamp)
1092            | (Value::Duration(_), DataType::Duration)
1093            | (Value::IpAddr(_), DataType::IpAddr)
1094            | (Value::MacAddr(_), DataType::MacAddr)
1095            | (Value::Vector(_), DataType::Vector)
1096            | (Value::Json(_), DataType::Json)
1097            | (Value::Uuid(_), DataType::Uuid)
1098            | (Value::NodeRef(_), DataType::NodeRef)
1099            | (Value::EdgeRef(_), DataType::EdgeRef)
1100            | (Value::VectorRef(_, _), DataType::VectorRef)
1101            | (Value::RowRef(_, _), DataType::RowRef)
1102            | (Value::Color(_), DataType::Color)
1103            | (Value::Email(_), DataType::Email)
1104            | (Value::Url(_), DataType::Url)
1105            | (Value::Phone(_), DataType::Phone)
1106            | (Value::Semver(_), DataType::Semver)
1107            | (Value::Cidr(_, _), DataType::Cidr)
1108            | (Value::Date(_), DataType::Date)
1109            | (Value::Time(_), DataType::Time)
1110            | (Value::Decimal(_), DataType::Decimal)
1111            | (Value::EnumValue(_), DataType::Enum)
1112            | (Value::Array(_), DataType::Array)
1113            | (Value::TimestampMs(_), DataType::TimestampMs)
1114            | (Value::Ipv4(_), DataType::Ipv4)
1115            | (Value::Ipv6(_), DataType::Ipv6)
1116            | (Value::Subnet(_, _), DataType::Subnet)
1117            | (Value::Port(_), DataType::Port)
1118            | (Value::Latitude(_), DataType::Latitude)
1119            | (Value::Longitude(_), DataType::Longitude)
1120            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1121            | (Value::Country2(_), DataType::Country2)
1122            | (Value::Country3(_), DataType::Country3)
1123            | (Value::Lang2(_), DataType::Lang2)
1124            | (Value::Lang5(_), DataType::Lang5)
1125            | (Value::Currency(_), DataType::Currency)
1126            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1127            | (Value::BigInt(_), DataType::BigInt)
1128            | (Value::KeyRef(_, _), DataType::KeyRef)
1129            | (Value::DocRef(_, _), DataType::DocRef)
1130            | (Value::TableRef(_), DataType::TableRef)
1131            | (Value::PageRef(_), DataType::PageRef)
1132            | (Value::Secret(_), DataType::Secret)
1133            | (Value::Password(_), DataType::Password)
1134    )
1135}
1136
1137fn value_to_coercion_input(value: &Value) -> Option<String> {
1138    match value {
1139        Value::Null => None,
1140        Value::Integer(value) => Some(value.to_string()),
1141        Value::UnsignedInteger(value) => Some(value.to_string()),
1142        Value::Float(value) => Some(value.to_string()),
1143        Value::Text(value) => Some(value.to_string()),
1144        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1145        Value::Boolean(value) => Some(value.to_string()),
1146        Value::Timestamp(value) => Some(value.to_string()),
1147        Value::Duration(value) => Some(value.to_string()),
1148        Value::IpAddr(value) => Some(value.to_string()),
1149        Value::MacAddr(value) => Some(format!(
1150            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1151            value[0], value[1], value[2], value[3], value[4], value[5]
1152        )),
1153        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1154        Value::Email(value) => Some(value.clone()),
1155        Value::Url(value) => Some(value.clone()),
1156        Value::Phone(value) => Some(value.to_string()),
1157        Value::Semver(value) => Some(format!(
1158            "{}.{}.{}",
1159            value / 1_000_000,
1160            (value / 1_000) % 1_000,
1161            value % 1_000
1162        )),
1163        Value::Date(value) => Some(value.to_string()),
1164        Value::Time(value) => Some(value.to_string()),
1165        Value::Decimal(value) => Some(value.to_string()),
1166        Value::TimestampMs(value) => Some(value.to_string()),
1167        Value::Ipv4(value) => Some(format!(
1168            "{}.{}.{}.{}",
1169            (value >> 24) & 0xFF,
1170            (value >> 16) & 0xFF,
1171            (value >> 8) & 0xFF,
1172            value & 0xFF
1173        )),
1174        Value::Port(value) => Some(value.to_string()),
1175        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1176        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1177        Value::GeoPoint(lat, lon) => Some(format!(
1178            "{},{}",
1179            *lat as f64 / 1_000_000.0,
1180            *lon as f64 / 1_000_000.0
1181        )),
1182        Value::BigInt(value) => Some(value.to_string()),
1183        Value::TableRef(value) => Some(value.clone()),
1184        Value::PageRef(value) => Some(value.to_string()),
1185        Value::Password(value) => Some(value.clone()),
1186        _ => None,
1187    }
1188}
1189
1190fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1191    if modified_columns.is_empty() {
1192        return modified_columns;
1193    }
1194
1195    let mut unique = Vec::with_capacity(modified_columns.len());
1196    for column in modified_columns.drain(..) {
1197        if !unique
1198            .iter()
1199            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1200        {
1201            unique.push(column);
1202        }
1203    }
1204    unique
1205}
1206
1207fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1208    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1209        return Err(crate::RedDBError::Query(
1210            "array positional document patch paths are unsupported; replace the array or full document body instead"
1211                .to_string(),
1212        ));
1213    }
1214    Ok(())
1215}
1216
1217fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1218    match fields.get("body") {
1219        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1220            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1221        }),
1222        Some(_) => Err(crate::RedDBError::Query(
1223            "document body field must contain JSON".to_string(),
1224        )),
1225        None => Ok(JsonValue::Object(Default::default())),
1226    }
1227}
1228
1229fn replace_document_row_body(
1230    fields: &mut HashMap<String, Value>,
1231    body: JsonValue,
1232    modified_columns: &mut Vec<String>,
1233) -> RedDBResult<()> {
1234    modified_columns.push("body".to_string());
1235
1236    let old_keys: Vec<String> = fields
1237        .keys()
1238        .filter(|key| key.as_str() != "body")
1239        .cloned()
1240        .collect();
1241    for key in old_keys {
1242        fields.remove(&key);
1243        modified_columns.push(key);
1244    }
1245
1246    let body_bytes = json_to_vec(&body).map_err(|err| {
1247        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1248    })?;
1249    fields.insert("body".to_string(), Value::Json(body_bytes));
1250
1251    if let JsonValue::Object(map) = &body {
1252        for (key, value) in map {
1253            fields.insert(key.clone(), json_to_storage_value(value)?);
1254            modified_columns.push(key.clone());
1255        }
1256    }
1257
1258    Ok(())
1259}
1260
1261impl RedDBRuntime {
1262    pub(crate) fn apply_loaded_patch_entity_core(
1263        &self,
1264        collection: String,
1265        mut entity: crate::storage::UnifiedEntity,
1266        payload: JsonValue,
1267        operations: Vec<PatchEntityOperation>,
1268    ) -> RedDBResult<AppliedEntityMutation> {
1269        let id = entity.id;
1270        let operations = normalize_ttl_patch_operations(operations)?;
1271        // Snapshot pre-patch row fields for the secondary-index hook —
1272        // empty for non-row entities, which is the desired no-op.
1273        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1274
1275        let db = self.db();
1276        let store = db.store();
1277        let Some(manager) = store.get_collection(&collection) else {
1278            return Err(crate::RedDBError::NotFound(format!(
1279                "collection not found: {collection}"
1280            )));
1281        };
1282
1283        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1284        let mut metadata_changed = false;
1285        let mut modified_columns: Vec<String> = Vec::new();
1286        let mut context_index_dirty = false;
1287        let mut graph_node_type: Option<String> = None;
1288        let mut graph_edge_weight: Option<f32> = None;
1289
1290        let row_contract_timestamps = db
1291            .collection_contract(&collection)
1292            .map(|c| c.timestamps_enabled)
1293            .unwrap_or(false);
1294
1295        match &mut entity.data {
1296            crate::storage::EntityData::Row(row) => {
1297                let is_document_collection = db
1298                    .collection_contract(&collection)
1299                    .map(|contract| {
1300                        contract.declared_model == crate::catalog::CollectionModel::Document
1301                    })
1302                    .unwrap_or(false);
1303                let mut field_ops = Vec::new();
1304                let mut metadata_ops = Vec::new();
1305                let mut document_body_ops = Vec::new();
1306
1307                for mut op in operations {
1308                    let Some(root) = op.path.first().map(String::as_str) else {
1309                        return Err(crate::RedDBError::Query(
1310                            "patch path cannot be empty".to_string(),
1311                        ));
1312                    };
1313
1314                    match root {
1315                        "body" if is_document_collection => {
1316                            if op.path.len() < 2 {
1317                                return Err(crate::RedDBError::Query(
1318                                    "document body patch paths require a nested key; use payload.body for full replacement"
1319                                        .to_string(),
1320                                ));
1321                            }
1322                            op.path.remove(0);
1323                            reject_document_array_position_path(&op.path)?;
1324                            document_body_ops.push(op);
1325                        }
1326                        "fields" | "named" => {
1327                            if op.path.len() < 2 {
1328                                return Err(crate::RedDBError::Query(
1329                                    "patch path 'fields' requires a nested key".to_string(),
1330                                ));
1331                            }
1332                            if row_contract_timestamps {
1333                                let leaf = op.path.get(1).map(String::as_str);
1334                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1335                                    return Err(crate::RedDBError::Query(format!(
1336                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1337                                        collection,
1338                                        leaf.unwrap_or("")
1339                                    )));
1340                                }
1341                            }
1342                            op.path.remove(0);
1343                            field_ops.push(op);
1344                        }
1345                        "metadata" => {
1346                            if op.path.len() < 2 {
1347                                return Err(crate::RedDBError::Query(
1348                                    "patch path 'metadata' requires a nested key".to_string(),
1349                                ));
1350                            }
1351                            op.path.remove(0);
1352                            metadata_ops.push(op);
1353                        }
1354                        _ => {
1355                            return Err(crate::RedDBError::Query(format!(
1356                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1357                            )));
1358                        }
1359                    }
1360                }
1361
1362                if !document_body_ops.is_empty() {
1363                    context_index_dirty = true;
1364                    let named = row.named.get_or_insert_with(Default::default);
1365                    let mut body = document_body_from_named(named)?;
1366                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1367                        .map_err(crate::RedDBError::Query)?;
1368                    replace_document_row_body(named, body, &mut modified_columns)?;
1369                }
1370
1371                if is_document_collection {
1372                    if let Some(body) = payload.get("body") {
1373                        context_index_dirty = true;
1374                        let named = row.named.get_or_insert_with(Default::default);
1375                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1376                    }
1377                }
1378
1379                if !field_ops.is_empty() {
1380                    context_index_dirty = true;
1381                    for op in &field_ops {
1382                        if let Some(col) = op.path.first() {
1383                            modified_columns.push(col.clone());
1384                        }
1385                    }
1386                    let named = row.named.get_or_insert_with(Default::default);
1387                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1388                }
1389
1390                if let Some(fields) = payload
1391                    .get("fields")
1392                    .and_then(crate::json::Value::as_object)
1393                {
1394                    if row_contract_timestamps {
1395                        for key in fields.keys() {
1396                            if key == "created_at" || key == "updated_at" {
1397                                return Err(crate::RedDBError::Query(format!(
1398                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1399                                    collection, key
1400                                )));
1401                            }
1402                        }
1403                    }
1404                    context_index_dirty = true;
1405                    let named = row.named.get_or_insert_with(Default::default);
1406                    for (key, value) in fields {
1407                        modified_columns.push(key.clone());
1408                        named.insert(key.clone(), json_to_storage_value(value)?);
1409                    }
1410                }
1411
1412                if !metadata_ops.is_empty() {
1413                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1414                    let metadata = patch_metadata.get_or_insert_with(|| {
1415                        store.get_metadata(&collection, id).unwrap_or_default()
1416                    });
1417                    let mut metadata_json = metadata_to_json(metadata);
1418                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1419                        .map_err(crate::RedDBError::Query)?;
1420                    *metadata = metadata_from_json(&metadata_json)?;
1421                    metadata_changed = true;
1422                }
1423
1424                if !modified_columns.is_empty() || row_contract_timestamps {
1425                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1426                    let current_fields = if let Some(named) = row.named.take() {
1427                        named.into_iter().collect::<Vec<_>>()
1428                    } else if let Some(schema) = row.schema.as_ref() {
1429                        schema
1430                            .iter()
1431                            .cloned()
1432                            .zip(row.columns.iter().cloned())
1433                            .collect::<Vec<_>>()
1434                    } else {
1435                        Vec::new()
1436                    };
1437                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1438                    if row_contract_timestamps {
1439                        modified_columns.push("updated_at".to_string());
1440                        context_index_dirty = true;
1441                    }
1442                    if contract.requires_uniqueness_check(&modified_columns) {
1443                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1444                    }
1445                    row.named = Some(normalized_fields.into_iter().collect());
1446                }
1447            }
1448            crate::storage::EntityData::Node(node) => {
1449                let mut field_ops = Vec::new();
1450                let mut metadata_ops = Vec::new();
1451                let mut node_type_ops = Vec::new();
1452
1453                for mut op in operations {
1454                    let Some(root) = op.path.first().map(String::as_str) else {
1455                        return Err(crate::RedDBError::Query(
1456                            "patch path cannot be empty".to_string(),
1457                        ));
1458                    };
1459
1460                    match root {
1461                        "fields" | "properties" => {
1462                            if op.path.len() < 2 {
1463                                return Err(crate::RedDBError::Query(
1464                                    "patch path 'fields' requires a nested key".to_string(),
1465                                ));
1466                            }
1467                            op.path.remove(0);
1468                            field_ops.push(op);
1469                        }
1470                        "metadata" => {
1471                            if op.path.len() < 2 {
1472                                return Err(crate::RedDBError::Query(
1473                                    "patch path 'metadata' requires a nested key".to_string(),
1474                                ));
1475                            }
1476                            op.path.remove(0);
1477                            metadata_ops.push(op);
1478                        }
1479                        "node_type" => {
1480                            if op.path.len() != 1 {
1481                                return Err(crate::RedDBError::Query(
1482                                    "patch path 'node_type' does not allow nested keys".to_string(),
1483                                ));
1484                            }
1485                            op.path.clear();
1486                            node_type_ops.push(op);
1487                        }
1488                        _ => {
1489                            return Err(crate::RedDBError::Query(format!(
1490                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1491                            )));
1492                        }
1493                    }
1494                }
1495
1496                for op in node_type_ops {
1497                    context_index_dirty = true;
1498                    let value = op.value.ok_or_else(|| {
1499                        crate::RedDBError::Query("node_type operations require a value".to_string())
1500                    })?;
1501
1502                    match op.op {
1503                        PatchEntityOperationType::Unset => {
1504                            return Err(crate::RedDBError::Query(
1505                                "node_type cannot be unset through patch operations".to_string(),
1506                            ));
1507                        }
1508                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1509                            let Some(node_type) = value.as_str() else {
1510                                return Err(crate::RedDBError::Query(
1511                                    "node_type operation requires a text value".to_string(),
1512                                ));
1513                            };
1514                            graph_node_type = Some(node_type.to_string());
1515                            modified_columns.push("node_type".to_string());
1516                        }
1517                    }
1518                }
1519
1520                if !field_ops.is_empty() {
1521                    context_index_dirty = true;
1522                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1523                }
1524
1525                if let Some(fields) = payload
1526                    .get("fields")
1527                    .and_then(crate::json::Value::as_object)
1528                {
1529                    context_index_dirty = true;
1530                    for (key, value) in fields {
1531                        node.properties
1532                            .insert(key.clone(), json_to_storage_value(value)?);
1533                    }
1534                }
1535
1536                if !metadata_ops.is_empty() {
1537                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1538                    let metadata = patch_metadata.get_or_insert_with(|| {
1539                        store.get_metadata(&collection, id).unwrap_or_default()
1540                    });
1541                    let mut metadata_json = metadata_to_json(metadata);
1542                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1543                        .map_err(crate::RedDBError::Query)?;
1544                    *metadata = metadata_from_json(&metadata_json)?;
1545                    metadata_changed = true;
1546                }
1547            }
1548            crate::storage::EntityData::Edge(edge) => {
1549                let mut field_ops = Vec::new();
1550                let mut metadata_ops = Vec::new();
1551                let mut weight_ops = Vec::new();
1552
1553                for mut op in operations {
1554                    let Some(root) = op.path.first().map(String::as_str) else {
1555                        return Err(crate::RedDBError::Query(
1556                            "patch path cannot be empty".to_string(),
1557                        ));
1558                    };
1559
1560                    match root {
1561                        "fields" | "properties" => {
1562                            if op.path.len() < 2 {
1563                                return Err(crate::RedDBError::Query(
1564                                    "patch path 'fields' requires a nested key".to_string(),
1565                                ));
1566                            }
1567                            op.path.remove(0);
1568                            field_ops.push(op);
1569                        }
1570                        "weight" => {
1571                            if op.path.len() != 1 {
1572                                return Err(crate::RedDBError::Query(
1573                                    "patch path 'weight' does not allow nested keys".to_string(),
1574                                ));
1575                            }
1576                            op.path.clear();
1577                            weight_ops.push(op);
1578                        }
1579                        "metadata" => {
1580                            if op.path.len() < 2 {
1581                                return Err(crate::RedDBError::Query(
1582                                    "patch path 'metadata' requires a nested key".to_string(),
1583                                ));
1584                            }
1585                            op.path.remove(0);
1586                            metadata_ops.push(op);
1587                        }
1588                        _ => {
1589                            return Err(crate::RedDBError::Query(format!(
1590                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1591                            )));
1592                        }
1593                    }
1594                }
1595
1596                if !field_ops.is_empty() {
1597                    context_index_dirty = true;
1598                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1599                }
1600
1601                for op in weight_ops {
1602                    context_index_dirty = true;
1603                    let value = op.value.ok_or_else(|| {
1604                        crate::RedDBError::Query("weight operations require a value".to_string())
1605                    })?;
1606
1607                    match op.op {
1608                        PatchEntityOperationType::Unset => {
1609                            return Err(crate::RedDBError::Query(
1610                                "weight cannot be unset through patch operations".to_string(),
1611                            ));
1612                        }
1613                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1614                            let Some(weight) = value.as_f64() else {
1615                                return Err(crate::RedDBError::Query(
1616                                    "weight operation requires a numeric value".to_string(),
1617                                ));
1618                            };
1619                            graph_edge_weight = Some(weight as f32);
1620                            edge.weight = weight as f32;
1621                            modified_columns.push("weight".to_string());
1622                        }
1623                    }
1624                }
1625
1626                if let Some(fields) = payload
1627                    .get("fields")
1628                    .and_then(crate::json::Value::as_object)
1629                {
1630                    context_index_dirty = true;
1631                    for (key, value) in fields {
1632                        edge.properties
1633                            .insert(key.clone(), json_to_storage_value(value)?);
1634                    }
1635                }
1636
1637                if !metadata_ops.is_empty() {
1638                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1639                    let metadata = patch_metadata.get_or_insert_with(|| {
1640                        store.get_metadata(&collection, id).unwrap_or_default()
1641                    });
1642                    let mut metadata_json = metadata_to_json(metadata);
1643                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1644                        .map_err(crate::RedDBError::Query)?;
1645                    *metadata = metadata_from_json(&metadata_json)?;
1646                    metadata_changed = true;
1647                }
1648            }
1649            crate::storage::EntityData::Vector(vector) => {
1650                let mut field_ops = Vec::new();
1651                let mut metadata_ops = Vec::new();
1652
1653                for mut op in operations {
1654                    let Some(root) = op.path.first().map(String::as_str) else {
1655                        return Err(crate::RedDBError::Query(
1656                            "patch path cannot be empty".to_string(),
1657                        ));
1658                    };
1659
1660                    match root {
1661                        "fields" => {
1662                            if op.path.len() < 2 {
1663                                return Err(crate::RedDBError::Query(
1664                                    "patch path 'fields' requires a nested key".to_string(),
1665                                ));
1666                            }
1667                            op.path.remove(0);
1668                            let Some(target) = op.path.first().map(String::as_str) else {
1669                                return Err(crate::RedDBError::Query(
1670                                    "patch path requires a target under fields".to_string(),
1671                                ));
1672                            };
1673                            if !matches!(target, "dense" | "content" | "sparse") {
1674                                return Err(crate::RedDBError::Query(format!(
1675                                    "unsupported vector patch target '{target}'"
1676                                )));
1677                            }
1678                            field_ops.push(op);
1679                        }
1680                        "metadata" => {
1681                            if op.path.len() < 2 {
1682                                return Err(crate::RedDBError::Query(
1683                                    "patch path 'metadata' requires a nested key".to_string(),
1684                                ));
1685                            }
1686                            op.path.remove(0);
1687                            metadata_ops.push(op);
1688                        }
1689                        _ => {
1690                            return Err(crate::RedDBError::Query(format!(
1691                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1692                            )));
1693                        }
1694                    }
1695                }
1696
1697                if !field_ops.is_empty() {
1698                    context_index_dirty = true;
1699                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1700                }
1701
1702                if let Some(fields) = payload
1703                    .get("fields")
1704                    .and_then(crate::json::Value::as_object)
1705                {
1706                    context_index_dirty = true;
1707                    if let Some(content) =
1708                        fields.get("content").and_then(crate::json::Value::as_str)
1709                    {
1710                        vector.content = Some(content.to_string());
1711                    }
1712                    if let Some(dense) = fields.get("dense") {
1713                        vector.dense = dense
1714                            .as_array()
1715                            .ok_or_else(|| {
1716                                crate::RedDBError::Query(
1717                                    "field 'dense' must be an array".to_string(),
1718                                )
1719                            })?
1720                            .iter()
1721                            .map(|value| {
1722                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1723                                    crate::RedDBError::Query(
1724                                        "field 'dense' must contain only numbers".to_string(),
1725                                    )
1726                                })
1727                            })
1728                            .collect::<Result<Vec<_>, _>>()?;
1729                    }
1730                }
1731
1732                if !metadata_ops.is_empty() {
1733                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1734                    let metadata = patch_metadata.get_or_insert_with(|| {
1735                        store.get_metadata(&collection, id).unwrap_or_default()
1736                    });
1737                    let mut metadata_json = metadata_to_json(metadata);
1738                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1739                        .map_err(crate::RedDBError::Query)?;
1740                    *metadata = metadata_from_json(&metadata_json)?;
1741                    metadata_changed = true;
1742                }
1743            }
1744            crate::storage::EntityData::TimeSeries(_)
1745            | crate::storage::EntityData::QueueMessage(_) => {
1746                return Err(crate::RedDBError::Query(
1747                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1748                        .to_string(),
1749                ));
1750            }
1751        }
1752
1753        if let Some(node_type) = graph_node_type {
1754            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1755                node.node_type = node_type;
1756            }
1757        }
1758        if let Some(weight) = graph_edge_weight {
1759            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1760                edge.weight = (weight * 1000.0) as u32;
1761            }
1762        }
1763
1764        if let Some(metadata) = payload
1765            .get("metadata")
1766            .and_then(crate::json::Value::as_object)
1767        {
1768            let patch_metadata = patch_metadata
1769                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1770            for (key, value) in metadata {
1771                ensure_non_tree_reserved_metadata_key(key)?;
1772                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1773            }
1774            metadata_changed = true;
1775        }
1776
1777        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1778            let patch_metadata = patch_metadata
1779                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1780            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1781                patch_metadata.remove(&key);
1782            } else {
1783                patch_metadata.set(key, value);
1784            }
1785            metadata_changed = true;
1786        }
1787
1788        entity.updated_at = std::time::SystemTime::now()
1789            .duration_since(std::time::UNIX_EPOCH)
1790            .unwrap_or_default()
1791            .as_secs();
1792
1793        modified_columns = dedupe_modified_columns(modified_columns);
1794
1795        Ok(AppliedEntityMutation {
1796            id,
1797            collection,
1798            entity,
1799            metadata: patch_metadata,
1800            modified_columns,
1801            persist_metadata: metadata_changed,
1802            context_index_dirty,
1803            replaced_entity: None,
1804            replaced_entity_previous_xmax: 0,
1805            pre_mutation_fields,
1806        })
1807    }
1808
1809    pub(crate) fn apply_loaded_sql_update_row_core(
1810        &self,
1811        collection: String,
1812        mut entity: crate::storage::UnifiedEntity,
1813        static_field_assignments: &[(String, Value)],
1814        dynamic_field_assignments: Vec<(String, Value)>,
1815        static_metadata_assignments: &[(String, MetadataValue)],
1816        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1817        row_contract_plan: Option<&RowUpdateContractPlan>,
1818        row_modified_columns_template: &[String],
1819        row_touches_unique_columns: bool,
1820    ) -> RedDBResult<AppliedEntityMutation> {
1821        let id = entity.id;
1822        let previous_xmax = entity.xmax;
1823        let db = self.db();
1824        let store = db.store();
1825        let Some(_) = store.get_collection(&collection) else {
1826            return Err(crate::RedDBError::NotFound(format!(
1827                "collection not found: {collection}"
1828            )));
1829        };
1830
1831        let versioned_update_xid = match self.current_xid() {
1832            Some(xid) => Some(xid),
1833            None => {
1834                let snapshot_manager = self.snapshot_manager();
1835                let xid = snapshot_manager.begin();
1836                snapshot_manager.commit(xid);
1837                Some(xid)
1838            }
1839        };
1840        let mut replaced_entity = versioned_update_xid.map(|xid| {
1841            let mut old = entity.clone();
1842            old.set_xmax(xid);
1843            old
1844        });
1845
1846        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1847        let row_contract_timestamps = row_contract_plan
1848            .map(|plan| plan.timestamps_enabled)
1849            .unwrap_or(false);
1850        let mut metadata_changed = false;
1851        let mut modified_columns = row_modified_columns_template.to_vec();
1852        let mut context_index_dirty = !modified_columns.is_empty();
1853
1854        // Snapshot OLD field values BEFORE applying the assignments —
1855        // the secondary-index maintenance hook needs both before/after to
1856        // delete-then-insert under changed indexed columns.
1857        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1858
1859        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1860            return Err(crate::RedDBError::Query(
1861                "SQL row update fast path requires a row entity".to_string(),
1862            ));
1863        };
1864
1865        let _ = row_contract_plan;
1866        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1867        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1868
1869        for (key, value) in static_metadata_assignments
1870            .iter()
1871            .cloned()
1872            .chain(dynamic_metadata_assignments)
1873        {
1874            ensure_non_tree_reserved_metadata_key(&key)?;
1875            patch_metadata
1876                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1877                .set(key, value);
1878            metadata_changed = true;
1879        }
1880
1881        if !modified_columns.is_empty() || row_contract_timestamps {
1882            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1883            if row_contract_timestamps {
1884                context_index_dirty = true;
1885                set_row_field(
1886                    row,
1887                    "updated_at",
1888                    Value::UnsignedInteger(current_unix_ms_u64()),
1889                );
1890                modified_columns.push("updated_at".to_string());
1891            }
1892            if row_touches_unique_columns {
1893                let current_fields = collect_row_fields(row);
1894                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1895            }
1896        }
1897
1898        entity.updated_at = std::time::SystemTime::now()
1899            .duration_since(std::time::UNIX_EPOCH)
1900            .unwrap_or_default()
1901            .as_secs();
1902
1903        if let Some(xid) = versioned_update_xid {
1904            let logical_id = entity.logical_id();
1905            entity.id = store.next_entity_id();
1906            entity.set_logical_id(logical_id);
1907            entity.set_xmin(xid);
1908            entity.set_xmax(0);
1909            if let Some(old) = replaced_entity.as_mut() {
1910                old.set_xmax(xid);
1911            }
1912        }
1913
1914        modified_columns = dedupe_modified_columns(modified_columns);
1915
1916        Ok(AppliedEntityMutation {
1917            id: entity.id,
1918            collection,
1919            entity,
1920            metadata: patch_metadata,
1921            modified_columns,
1922            persist_metadata: metadata_changed,
1923            context_index_dirty,
1924            replaced_entity,
1925            replaced_entity_previous_xmax: previous_xmax,
1926            pre_mutation_fields,
1927        })
1928    }
1929
1930    pub(crate) fn persist_applied_entity_mutations(
1931        &self,
1932        applied: &[AppliedEntityMutation],
1933    ) -> RedDBResult<()> {
1934        if applied.is_empty() {
1935            return Ok(());
1936        }
1937
1938        let store = self.db().store();
1939        let collection = &applied[0].collection;
1940        let Some(manager) = store.get_collection(collection) else {
1941            return Err(crate::RedDBError::NotFound(format!(
1942                "collection not found: {collection}"
1943            )));
1944        };
1945
1946        let mut ordinary = Vec::with_capacity(applied.len());
1947        for item in applied {
1948            if let Some(old_version) = item.replaced_entity.as_ref() {
1949                store
1950                    .install_versioned_table_row_update(
1951                        collection,
1952                        old_version.clone(),
1953                        item.entity.clone(),
1954                        if item.persist_metadata {
1955                            item.metadata.as_ref()
1956                        } else {
1957                            None
1958                        },
1959                    )
1960                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1961                if self.current_xid().is_some() {
1962                    self.record_pending_versioned_update(
1963                        crate::runtime::impl_core::current_connection_id(),
1964                        collection,
1965                        old_version.id,
1966                        item.entity.id,
1967                        old_version.xmax,
1968                        item.replaced_entity_previous_xmax,
1969                    );
1970                }
1971            } else {
1972                ordinary.push(item);
1973            }
1974        }
1975        if ordinary.is_empty() {
1976            return Ok(());
1977        }
1978
1979        manager
1980            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1981                (
1982                    &item.entity,
1983                    item.modified_columns.as_slice(),
1984                    if item.persist_metadata {
1985                        item.metadata.as_ref()
1986                    } else {
1987                        None
1988                    },
1989                )
1990            }))
1991            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1992
1993        // PG-HOT-like fast path: segment in-place is done; when no
1994        // mutation touches a secondary-indexed column AND no metadata
1995        // payload needs to be folded into a B-tree record, skip the
1996        // in-line B-tree upsert. The WAL still records the update
1997        // (durability preserved; recovery replay rebuilds the B-tree),
1998        // and `manager.get()` prefers the live segment over the
1999        // B-tree for reads — so the short-circuit is invisible to
2000        // callers. See `persist_entities_to_pager_wal_only`.
2001        let indexed_cols = self
2002            .index_store_ref()
2003            .indexed_columns_set(collection.as_str());
2004        let all_hot = !indexed_cols.is_empty()
2005            && ordinary.iter().all(|item| {
2006                !item.persist_metadata
2007                    && !item
2008                        .modified_columns
2009                        .iter()
2010                        .any(|c| indexed_cols.contains(c))
2011            })
2012            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2013
2014        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2015        // inner loop hands us `applied` which already owns the post-image
2016        // entity; all we need for the persist path is a read borrow.
2017        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2018            ordinary.iter().map(|item| &item.entity).collect();
2019        let persist_fn = if all_hot {
2020            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2021        } else {
2022            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2023        };
2024        persist_fn(store.as_ref(), collection, &entity_refs)
2025            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2026    }
2027
2028    pub(crate) fn flush_applied_entity_mutation(
2029        &self,
2030        applied: &AppliedEntityMutation,
2031    ) -> RedDBResult<()> {
2032        let store = self.db().store();
2033        if applied.context_index_dirty {
2034            store
2035                .context_index()
2036                .index_entity(&applied.collection, &applied.entity);
2037        }
2038        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2039        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2040        // didn't carry recoverable column names) — there's nothing to
2041        // delete-then-insert in that case.
2042        //
2043        // Also build the CDC damage vector here so downstream consumers
2044        // see which columns changed without re-diffing.
2045        let mut changed_columns: Option<Vec<String>> = None;
2046        if !applied.pre_mutation_fields.is_empty() {
2047            let post = entity_row_fields_snapshot(&applied.entity);
2048            if !post.is_empty() {
2049                let damage = crate::application::entity::row_damage_vector(
2050                    &applied.pre_mutation_fields,
2051                    &post,
2052                );
2053                if !damage.is_empty() {
2054                    changed_columns = Some(
2055                        damage
2056                            .touched_columns()
2057                            .into_iter()
2058                            .map(str::to_string)
2059                            .collect(),
2060                    );
2061                }
2062
2063                // HOT-like fast path (P3.T2/T3): when no modified
2064                // column is covered by a secondary index, skip the
2065                // `index_entity_update` call entirely. The function
2066                // would short-circuit internally, but the call still
2067                // reads the registry lock + walks the damage vector
2068                // — avoiding it saves a few microseconds per UPDATE.
2069                // Page-local replace + t_ctid chain support (true
2070                // HOT) lives in a follow-up storage spec.
2071                let indexed_cols: std::collections::HashSet<String> = self
2072                    .index_store_ref()
2073                    .list_indices(applied.collection.as_str())
2074                    .into_iter()
2075                    .filter_map(|idx| idx.columns.first().cloned())
2076                    .collect();
2077                let modified_cols: std::collections::HashSet<String> = damage
2078                    .touched_columns()
2079                    .into_iter()
2080                    .map(str::to_string)
2081                    .collect();
2082                if let Some(old_version) = applied.replaced_entity.as_ref() {
2083                    let old_index_fields: Vec<(String, Value)> = applied
2084                        .pre_mutation_fields
2085                        .iter()
2086                        .filter(|(col, _)| indexed_cols.contains(col))
2087                        .cloned()
2088                        .collect();
2089                    let new_index_fields: Vec<(String, Value)> = post
2090                        .iter()
2091                        .filter(|(col, _)| indexed_cols.contains(col))
2092                        .cloned()
2093                        .collect();
2094                    if !old_index_fields.is_empty() {
2095                        self.index_store_ref()
2096                            .index_entity_delete(
2097                                &applied.collection,
2098                                old_version.id,
2099                                &old_index_fields,
2100                            )
2101                            .map_err(crate::RedDBError::Internal)?;
2102                    }
2103                    if !new_index_fields.is_empty() {
2104                        self.index_store_ref()
2105                            .index_entity_insert(
2106                                &applied.collection,
2107                                applied.entity.id,
2108                                &new_index_fields,
2109                            )
2110                            .map_err(crate::RedDBError::Internal)?;
2111                    }
2112                } else {
2113                    let decision = crate::storage::engine::hot_update::decide(
2114                        &crate::storage::engine::hot_update::HotUpdateInputs {
2115                            collection: applied.collection.as_str(),
2116                            indexed_columns: &indexed_cols,
2117                            modified_columns: &modified_cols,
2118                            // The storage layer currently handles fit via
2119                            // the segment abstraction; we bypass the
2120                            // page-size check here.
2121                            new_tuple_size: 0,
2122                            page_free_space: usize::MAX,
2123                        },
2124                    );
2125                    if !decision.can_hot {
2126                        self.index_store_ref()
2127                            .index_entity_update(
2128                                &applied.collection,
2129                                applied.id,
2130                                &applied.pre_mutation_fields,
2131                                &post,
2132                            )
2133                            .map_err(crate::RedDBError::Internal)?;
2134                    } else {
2135                        // F-04: `applied.collection` is tenant-supplied;
2136                        // strip CR/LF/control bytes via the LogField
2137                        // escaper (ADR 0010).
2138                        tracing::debug!(
2139                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2140                            "hot_update fast-path: skipped index_entity_update"
2141                        );
2142                    }
2143                }
2144            }
2145        }
2146        self.cdc_emit_prebuilt_with_columns(
2147            crate::replication::cdc::ChangeOperation::Update,
2148            &applied.collection,
2149            &applied.entity,
2150            "entity",
2151            applied.metadata.as_ref(),
2152            true,
2153            changed_columns,
2154        );
2155        Ok(())
2156    }
2157
2158    pub(crate) fn apply_loaded_patch_entity(
2159        &self,
2160        collection: String,
2161        entity: crate::storage::UnifiedEntity,
2162        payload: JsonValue,
2163        operations: Vec<PatchEntityOperation>,
2164    ) -> RedDBResult<CreateEntityOutput> {
2165        let applied =
2166            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2167        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2168        self.flush_applied_entity_mutation(&applied)?;
2169        Ok(CreateEntityOutput {
2170            id: applied.id,
2171            entity: Some(applied.entity),
2172        })
2173    }
2174}
2175
2176fn ensure_non_tree_reserved_metadata_patch_paths(
2177    operations: &[PatchEntityOperation],
2178) -> RedDBResult<()> {
2179    for operation in operations {
2180        let Some(key) = operation.path.first().map(String::as_str) else {
2181            continue;
2182        };
2183        ensure_non_tree_reserved_metadata_key(key)?;
2184    }
2185    Ok(())
2186}
2187
2188fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2189    if key.starts_with(TREE_METADATA_PREFIX) {
2190        return Err(crate::RedDBError::Query(format!(
2191            "metadata key '{}' is reserved for managed trees",
2192            key
2193        )));
2194    }
2195    Ok(())
2196}
2197
2198fn ensure_non_tree_reserved_metadata_entries(
2199    metadata: &[(String, MetadataValue)],
2200) -> RedDBResult<()> {
2201    for (key, _) in metadata {
2202        ensure_non_tree_reserved_metadata_key(key)?;
2203    }
2204    Ok(())
2205}
2206
2207fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2208    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2209        return Err(crate::RedDBError::Query(format!(
2210            "edge label '{}' is reserved for managed trees",
2211            TREE_CHILD_EDGE_LABEL
2212        )));
2213    }
2214    Ok(())
2215}
2216
2217impl RedDBRuntime {
2218    pub(crate) fn create_node_unchecked(
2219        &self,
2220        input: CreateNodeInput,
2221    ) -> RedDBResult<CreateEntityOutput> {
2222        let db = self.db();
2223        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2224        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2225        let mut metadata = input.metadata;
2226        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2227        let mut builder = db.node(&input.collection, &input.label);
2228
2229        if let Some(node_type) = input.node_type {
2230            builder = builder.node_type(node_type);
2231        }
2232
2233        for (key, value) in input.properties {
2234            builder = builder.property(key, value);
2235        }
2236
2237        for (key, value) in metadata {
2238            builder = builder.metadata(key, value);
2239        }
2240
2241        for embedding in input.embeddings {
2242            if let Some(model) = embedding.model {
2243                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2244            } else {
2245                builder = builder.embedding(embedding.name, embedding.vector);
2246            }
2247        }
2248
2249        for link in input.table_links {
2250            builder = builder.link_to_table(link.key, link.table);
2251        }
2252
2253        for link in input.node_links {
2254            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2255        }
2256
2257        let id = builder.save()?;
2258        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2259        // don't see this node until the transaction commits.
2260        self.stamp_xmin_if_in_txn(&input.collection, id);
2261        refresh_context_index(&db, &input.collection, id)?;
2262        self.cdc_emit(
2263            crate::replication::cdc::ChangeOperation::Insert,
2264            &input.collection,
2265            id.raw(),
2266            "graph_node",
2267        );
2268        Ok(CreateEntityOutput {
2269            id,
2270            entity: db.store().get(&input.collection, id),
2271        })
2272    }
2273
2274    pub(crate) fn create_edge_unchecked(
2275        &self,
2276        input: CreateEdgeInput,
2277    ) -> RedDBResult<CreateEntityOutput> {
2278        let db = self.db();
2279        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2280        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2281        let mut metadata = input.metadata;
2282        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2283        let mut builder = db
2284            .edge(&input.collection, &input.label)
2285            .from(input.from)
2286            .to(input.to);
2287
2288        if let Some(weight) = input.weight {
2289            builder = builder.weight(weight);
2290        }
2291
2292        for (key, value) in input.properties {
2293            builder = builder.property(key, value);
2294        }
2295
2296        for (key, value) in metadata {
2297            builder = builder.metadata(key, value);
2298        }
2299
2300        let id = builder.save()?;
2301        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2302        // sessions don't follow it until COMMIT.
2303        self.stamp_xmin_if_in_txn(&input.collection, id);
2304        refresh_context_index(&db, &input.collection, id)?;
2305        self.cdc_emit(
2306            crate::replication::cdc::ChangeOperation::Insert,
2307            &input.collection,
2308            id.raw(),
2309            "graph_edge",
2310        );
2311        Ok(CreateEntityOutput {
2312            id,
2313            entity: db.store().get(&input.collection, id),
2314        })
2315    }
2316}
2317
2318fn create_rows_batch_prevalidated_columnar_with_outputs(
2319    runtime: &RedDBRuntime,
2320    collection: String,
2321    column_names: std::sync::Arc<Vec<String>>,
2322    rows: Vec<Vec<crate::storage::schema::Value>>,
2323) -> RedDBResult<Vec<CreateEntityOutput>> {
2324    use crate::storage::{
2325        unified::{EntityData, EntityKind, RowData},
2326        EntityId, UnifiedEntity,
2327    };
2328    use std::sync::Arc;
2329
2330    if rows.is_empty() {
2331        return Ok(Vec::new());
2332    }
2333    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2334    runtime.check_batch_size(rows.len())?;
2335    runtime.check_db_size()?;
2336
2337    let db = runtime.db();
2338    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2339    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2340
2341    let store = db.store();
2342    let table_arc: Arc<str> = Arc::from(collection.as_str());
2343
2344    let indexed_cols = runtime
2345        .index_store_ref()
2346        .indexed_columns_set(collection.as_str());
2347    let has_secondary_indexes = !indexed_cols.is_empty();
2348    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2349        if has_secondary_indexes {
2350            Vec::with_capacity(rows.len())
2351        } else {
2352            Vec::new()
2353        };
2354
2355    let entities: Vec<UnifiedEntity> = rows
2356        .into_iter()
2357        .map(|values| {
2358            if has_secondary_indexes {
2359                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2360                    Vec::with_capacity(indexed_cols.len());
2361                for (name, val) in column_names.iter().zip(values.iter()) {
2362                    if indexed_cols.contains(name) {
2363                        snap.push((name.clone(), val.clone()));
2364                    }
2365                }
2366                field_snapshots.push(snap);
2367            }
2368            let mut row = RowData::new(values);
2369            row.schema = Some(Arc::clone(&column_names));
2370            UnifiedEntity::new(
2371                EntityId::new(0),
2372                EntityKind::TableRow {
2373                    table: Arc::clone(&table_arc),
2374                    row_id: 0,
2375                },
2376                EntityData::Row(row),
2377            )
2378        })
2379        .collect();
2380
2381    let ids = store
2382        .bulk_insert(&collection, entities)
2383        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2384
2385    if has_secondary_indexes {
2386        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2387            .iter()
2388            .zip(field_snapshots)
2389            .map(|(id, fields)| (*id, fields))
2390            .collect();
2391        runtime
2392            .index_store_ref()
2393            .index_entity_insert_batch(&collection, &index_rows)
2394            .map_err(crate::RedDBError::Internal)?;
2395    }
2396
2397    runtime.invalidate_result_cache();
2398    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2399
2400    Ok(ids
2401        .into_iter()
2402        .map(|id| CreateEntityOutput { id, entity: None })
2403        .collect())
2404}
2405
2406impl RuntimeEntityPort for RedDBRuntime {
2407    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2408        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2409        let db = self.db();
2410        let CreateRowInput {
2411            collection,
2412            fields,
2413            metadata: input_metadata,
2414            node_links,
2415            vector_links,
2416        } = input;
2417        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2418        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2419        let mut metadata = input_metadata;
2420        apply_collection_default_ttl(&db, &collection, &mut metadata);
2421        let fields = contract.normalize_insert_fields(fields)?;
2422        contract.enforce_row_uniqueness(&fields, None)?;
2423        // Route through MutationEngine for unified hot path.
2424        let engine = self.mutation_engine();
2425        let result = engine.apply(
2426            collection.clone(),
2427            vec![crate::runtime::mutation::MutationRow {
2428                fields,
2429                metadata,
2430                node_links,
2431                vector_links,
2432            }],
2433        )?;
2434        let id = result.ids[0];
2435        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2436        // that also takes a write lock on the entity cache. We know
2437        // the collection — hit the manager directly. Cuts
2438        // create_row() p50 roughly in half on the hot path.
2439        Ok(CreateEntityOutput {
2440            id,
2441            entity: db.store().get(&collection, id),
2442        })
2443    }
2444
2445    fn create_rows_batch(
2446        &self,
2447        input: CreateRowsBatchInput,
2448    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2449        if input.rows.is_empty() {
2450            return Ok(Vec::new());
2451        }
2452        self.check_batch_size(input.rows.len())?;
2453        self.check_db_size()?;
2454
2455        let db = self.db();
2456        let collection = input.collection;
2457        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2458        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2459
2460        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2461        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2462        for row in input.rows {
2463            if row.collection != collection {
2464                return Err(crate::RedDBError::Query(format!(
2465                    "batch row collection mismatch: expected '{}', got '{}'",
2466                    collection, row.collection
2467                )));
2468            }
2469
2470            let mut metadata = row.metadata;
2471            apply_collection_default_ttl(&db, &collection, &mut metadata);
2472            let fields = contract.normalize_insert_fields(row.fields)?;
2473            contract.enforce_row_uniqueness(&fields, None)?;
2474            uniqueness_rows.push(fields.clone());
2475            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2476        }
2477
2478        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2479
2480        // Route through MutationEngine: single bulk_insert + one CDC batch
2481        // instead of N separate cdc_emit() calls (each acquires a write lock).
2482        let engine = {
2483            let e = self.mutation_engine();
2484            if input.suppress_events {
2485                e.with_suppress_events()
2486            } else {
2487                e
2488            }
2489        };
2490        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2491            .into_iter()
2492            .map(|(fields, metadata, node_links, vector_links)| {
2493                crate::runtime::mutation::MutationRow {
2494                    fields,
2495                    metadata,
2496                    node_links,
2497                    vector_links,
2498                }
2499            })
2500            .collect();
2501
2502        let result = engine
2503            .apply(collection.clone(), mutation_rows)
2504            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2505
2506        let store = db.store();
2507        Ok(result
2508            .ids
2509            .into_iter()
2510            .map(|id| CreateEntityOutput {
2511                id,
2512                entity: store.get(&collection, id),
2513            })
2514            .collect())
2515    }
2516
2517    fn create_rows_batch_prevalidated_columnar(
2518        &self,
2519        collection: String,
2520        column_names: std::sync::Arc<Vec<String>>,
2521        rows: Vec<Vec<crate::storage::schema::Value>>,
2522    ) -> RedDBResult<usize> {
2523        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2524            .map(|outputs| outputs.len())
2525    }
2526
2527    fn create_rows_batch_columnar(
2528        &self,
2529        collection: String,
2530        column_names: std::sync::Arc<Vec<String>>,
2531        rows: Vec<Vec<crate::storage::schema::Value>>,
2532    ) -> RedDBResult<usize> {
2533        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2534            .map(|outputs| outputs.len())
2535    }
2536
2537    fn create_rows_batch_columnar_with_outputs(
2538        &self,
2539        collection: String,
2540        column_names: std::sync::Arc<Vec<String>>,
2541        rows: Vec<Vec<crate::storage::schema::Value>>,
2542    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2543        if rows.is_empty() {
2544            return Ok(Vec::new());
2545        }
2546        self.check_batch_size(rows.len())?;
2547        self.check_db_size()?;
2548
2549        let db = self.db();
2550        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2551        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2552
2553        // Fast path: when the collection carries no contract (or the
2554        // contract has no declared columns) `normalize_insert_fields`
2555        // is a no-op and `enforce_row_uniqueness` finds no unique
2556        // columns to check. Skip the per-row `(String, Value)` tuple
2557        // materialisation entirely and route through the prevalidated
2558        // columnar kernel — same write, CDC, and index path, just
2559        // without the wasted (String, Value) clones. This is the
2560        // bench `bench_users` shape (no contract declared by the
2561        // adapter's `setup_schema`).
2562        let needs_normalisation = match db.collection_contract(&collection) {
2563            Some(c) => {
2564                c.declared_model == crate::catalog::CollectionModel::Table
2565                    && (!c.declared_columns.is_empty()
2566                        || c.table_def
2567                            .as_ref()
2568                            .map(|t| !t.columns.is_empty())
2569                            .unwrap_or(false))
2570            }
2571            None => false,
2572        };
2573        if !needs_normalisation {
2574            return create_rows_batch_prevalidated_columnar_with_outputs(
2575                self,
2576                collection,
2577                column_names,
2578                rows,
2579            );
2580        }
2581
2582        // Slow path: contract requires per-row normalisation /
2583        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2584        // the columnar layout (this still pays N×ncols `String::clone`
2585        // — but it's deferred out of the wire decoder hot loop and
2586        // happens behind a runtime API, not the per-row decode loop)
2587        // and fall through to the existing `create_rows_batch` path.
2588        let ncols = column_names.len();
2589        let tuple_rows: Vec<CreateRowInput> = rows
2590            .into_iter()
2591            .map(|values| {
2592                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2593                    Vec::with_capacity(ncols);
2594                for (name, value) in column_names.iter().zip(values) {
2595                    fields.push((name.clone(), value));
2596                }
2597                CreateRowInput {
2598                    collection: collection.clone(),
2599                    fields,
2600                    metadata: Vec::new(),
2601                    node_links: Vec::new(),
2602                    vector_links: Vec::new(),
2603                }
2604            })
2605            .collect();
2606        self.create_rows_batch(CreateRowsBatchInput {
2607            collection,
2608            rows: tuple_rows,
2609            suppress_events: false,
2610        })
2611    }
2612
2613    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2614        if input.rows.is_empty() {
2615            return Ok(0);
2616        }
2617        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2618        self.check_batch_size(input.rows.len())?;
2619        self.check_db_size()?;
2620
2621        let db = self.db();
2622        let collection = input.collection;
2623        // Still verify the collection's declared model before we blast
2624        // rows at it — this one-off check is O(1), independent of
2625        // ncols, and catches schema-kind mismatches that the client
2626        // can't always see.
2627        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2628        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2629
2630        // Hoist the per-collection default TTL lookup out of the
2631        // per-row loop — it only depends on the collection, not on
2632        // individual rows, and the old path did one HashMap read per
2633        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2634        // apply to any row whose metadata doesn't already carry a TTL.
2635        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2636
2637        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2638            Vec::with_capacity(input.rows.len());
2639        let mut mutation_rows = mutation_rows;
2640        for row in input.rows {
2641            if row.collection != collection {
2642                return Err(crate::RedDBError::Query(format!(
2643                    "batch row collection mismatch: expected '{}', got '{}'",
2644                    collection, row.collection
2645                )));
2646            }
2647            let mut metadata = row.metadata;
2648            if let Some(ttl) = default_ttl_ms {
2649                if !has_internal_ttl_metadata(&metadata) {
2650                    metadata.push((
2651                        "_ttl_ms".to_string(),
2652                        if ttl <= i64::MAX as u64 {
2653                            MetadataValue::Int(ttl as i64)
2654                        } else {
2655                            MetadataValue::Timestamp(ttl)
2656                        },
2657                    ));
2658                }
2659            }
2660            mutation_rows.push(crate::runtime::mutation::MutationRow {
2661                fields: row.fields,
2662                metadata,
2663                node_links: row.node_links,
2664                vector_links: row.vector_links,
2665            });
2666        }
2667
2668        let engine = self.mutation_engine();
2669        let result = engine
2670            .apply(collection, mutation_rows)
2671            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2672        Ok(result.ids.len())
2673    }
2674
2675    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2676        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2677        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2678            input.properties.iter().map(|(key, _)| key.as_str()),
2679            &format!("node '{}'", input.collection),
2680        )?;
2681        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2682        self.create_node_unchecked(input)
2683    }
2684
2685    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2686        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2687        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2688            input.properties.iter().map(|(key, _)| key.as_str()),
2689            &format!("edge '{}'", input.collection),
2690        )?;
2691        ensure_non_tree_structural_edge_label(&input.label)?;
2692        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2693        self.create_edge_unchecked(input)
2694    }
2695
2696    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2697        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2698        let db = self.db();
2699        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2700        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2701        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2702        let mut metadata = input.metadata;
2703        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2704        let mut builder = db.vector(&input.collection).dense(input.dense);
2705
2706        if let Some(content) = input.content {
2707            builder = builder.content(content);
2708        }
2709
2710        for (key, value) in metadata {
2711            builder = builder.metadata(key, value);
2712        }
2713
2714        if let Some(link_row) = input.link_row {
2715            builder = builder.link_to_table(link_row);
2716        }
2717
2718        if let Some(link_node) = input.link_node {
2719            builder = builder.link_to_node(link_node);
2720        }
2721
2722        let id = builder.save()?;
2723        let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2724            Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2725            _ => None,
2726        };
2727        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2728        // concurrent ANN scans hide it until the transaction commits.
2729        self.stamp_xmin_if_in_txn(&input.collection, id);
2730        refresh_context_index(&db, &input.collection, id)?;
2731        // Issue #693 — vector.turbo write path. Order matters and
2732        // matches the write sequence the next slice (#673) will
2733        // attach kill-points to: WAL append → in-memory index update
2734        // → TurboExtent append. The legacy `vector` path is the
2735        // `None` branch and is untouched.
2736        if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2737            // Lazy populate so an INSERT after restart sees the
2738            // pre-restart vectors in the index before appending its
2739            // own.
2740            state.ensure_populated(&db.store(), &input.collection);
2741            // Issue #694 — named crash boundaries for #673.
2742            use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2743            fire(InjectionPoint::BeforeWalFsync);
2744            let _ = db
2745                .store()
2746                .append_vector_insert_record(&input.collection, id.raw(), &dense);
2747            fire(InjectionPoint::BeforeIndexCommit);
2748            let mut index = state.index.lock();
2749            index.insert(id, dense.clone());
2750            // Mirror the encoded codes into the per-collection
2751            // TurboExtent when one is available. The bytes are the
2752            // 4-bit-packed code groups (`n_byte_groups`) plus the
2753            // little-endian L2 norm — same shape `BlockedCodeStorage`
2754            // appends in-memory. Failures are non-fatal in this
2755            // slice; durability of the extent is owned by #673.
2756            fire(InjectionPoint::BeforeExtentFsync);
2757            if let Some(extent) = state.extent.lock().as_mut() {
2758                let packed = index.encode_for_extent(&dense);
2759                let _ = extent.append(&packed);
2760            }
2761        }
2762        self.cdc_emit(
2763            crate::replication::cdc::ChangeOperation::Insert,
2764            &input.collection,
2765            id.raw(),
2766            "vector",
2767        );
2768        Ok(CreateEntityOutput {
2769            id,
2770            entity: db.store().get(&input.collection, id),
2771        })
2772    }
2773
2774    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2775        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2776        let db = self.db();
2777        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2778        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2779
2780        if let JsonValue::Object(ref map) = input.body {
2781            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2782                map.keys().map(String::as_str),
2783                &format!("document '{}'", input.collection),
2784            )?;
2785        }
2786
2787        // Serialize the full body as Value::Json for the "body" field
2788        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2789            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2790        })?;
2791        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2792            "body".to_string(),
2793            crate::storage::schema::Value::Json(body_bytes),
2794        )];
2795
2796        // Flatten top-level keys from the body into named fields for filtering
2797        if let JsonValue::Object(ref map) = input.body {
2798            for (key, value) in map {
2799                let storage_value = json_to_storage_value(value)?;
2800                fields.push((key.clone(), storage_value));
2801            }
2802        }
2803
2804        let mut metadata = input.metadata;
2805        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2806        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2807            .iter()
2808            .map(|(key, value)| (key.as_str(), value.clone()))
2809            .collect();
2810        let mut builder = db.row(&input.collection, columns);
2811
2812        for (key, value) in metadata {
2813            builder = builder.metadata(key, value);
2814        }
2815
2816        for node in input.node_links {
2817            builder = builder.link_to_node(node);
2818        }
2819
2820        for vector in input.vector_links {
2821            builder = builder.link_to_vector(vector);
2822        }
2823
2824        let id = builder.save()?;
2825        // Phase 1.1 MVCC universal: stamp xmin on the document.
2826        self.stamp_xmin_if_in_txn(&input.collection, id);
2827        refresh_context_index(&db, &input.collection, id)?;
2828        self.cdc_emit(
2829            crate::replication::cdc::ChangeOperation::Insert,
2830            &input.collection,
2831            id.raw(),
2832            "document",
2833        );
2834        Ok(CreateEntityOutput {
2835            id,
2836            entity: db.store().get(&input.collection, id),
2837        })
2838    }
2839
2840    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2841        let db = self.db();
2842        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2843        let declared_model = db
2844            .collection_contract(&input.collection)
2845            .map(|contract| contract.declared_model);
2846        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2847            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2848            self.seal_vault_value(&input.collection, input.value)?
2849        } else {
2850            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2851            input.value
2852        };
2853        let fields = vec![
2854            (
2855                "key".to_string(),
2856                crate::storage::schema::Value::text(input.key),
2857            ),
2858            ("value".to_string(), value),
2859        ];
2860        let collection = input.collection;
2861        let result = self.mutation_engine().apply(
2862            collection.clone(),
2863            vec![crate::runtime::mutation::MutationRow {
2864                fields,
2865                metadata: input.metadata,
2866                node_links: Vec::new(),
2867                vector_links: Vec::new(),
2868            }],
2869        )?;
2870        let id = result.ids[0];
2871        Ok(CreateEntityOutput {
2872            id,
2873            entity: db.store().get(&collection, id),
2874        })
2875    }
2876
2877    fn create_timeseries_point(
2878        &self,
2879        input: CreateTimeSeriesPointInput,
2880    ) -> RedDBResult<CreateEntityOutput> {
2881        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2882        let db = self.db();
2883        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2884        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2885
2886        let mut fields = vec![
2887            (
2888                "metric".to_string(),
2889                crate::storage::schema::Value::text(input.metric),
2890            ),
2891            (
2892                "value".to_string(),
2893                crate::storage::schema::Value::Float(input.value),
2894            ),
2895        ];
2896
2897        if let Some(timestamp_ns) = input.timestamp_ns {
2898            fields.push((
2899                "timestamp".to_string(),
2900                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2901            ));
2902        }
2903
2904        if !input.tags.is_empty() {
2905            let tags_json = JsonValue::Object(
2906                input
2907                    .tags
2908                    .into_iter()
2909                    .map(|(key, value)| (key, JsonValue::String(value)))
2910                    .collect(),
2911            );
2912            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2913                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2914            })?;
2915            fields.push((
2916                "tags".to_string(),
2917                crate::storage::schema::Value::Json(tags_bytes),
2918            ));
2919        }
2920
2921        let collection = input.collection;
2922        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2923        // Phase 1.1 MVCC universal: stamp xmin on the point so
2924        // concurrent range scans hide it until COMMIT.
2925        self.stamp_xmin_if_in_txn(&collection, id);
2926        refresh_context_index(&db, &collection, id)?;
2927
2928        Ok(CreateEntityOutput {
2929            id,
2930            entity: db.store().get(&collection, id),
2931        })
2932    }
2933
2934    fn get_kv(
2935        &self,
2936        collection: &str,
2937        key: &str,
2938    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2939        let db = self.db();
2940        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2941        let store = db.store();
2942        let Some(manager) = store.get_collection(collection) else {
2943            return Ok(None);
2944        };
2945        let entities = manager.query_all(|_| true);
2946        for entity in entities {
2947            if let crate::storage::EntityData::Row(ref row) = entity.data {
2948                if let Some(ref named) = row.named {
2949                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2950                        if &**k == key {
2951                            let value = named
2952                                .get("value")
2953                                .cloned()
2954                                .unwrap_or(crate::storage::schema::Value::Null);
2955                            return Ok(Some((value, entity.id)));
2956                        }
2957                    }
2958                }
2959            }
2960        }
2961        Ok(None)
2962    }
2963
2964    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2965        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2966        let found = self.get_kv(collection, key)?;
2967        if let Some((_, id)) = found {
2968            let db = self.db();
2969            let store = db.store();
2970            let deleted = store
2971                .delete(collection, id)
2972                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2973            if deleted {
2974                store.context_index().remove_entity(id);
2975            }
2976            Ok(deleted)
2977        } else {
2978            Ok(false)
2979        }
2980    }
2981
2982    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2983        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2984        let PatchEntityInput {
2985            collection,
2986            id,
2987            payload,
2988            operations,
2989        } = input;
2990        let db = self.db();
2991        let store = db.store();
2992        let Some(manager) = store.get_collection(&collection) else {
2993            return Err(crate::RedDBError::NotFound(format!(
2994                "collection not found: {collection}"
2995            )));
2996        };
2997        let Some(entity) = manager.get(id) else {
2998            return Err(crate::RedDBError::NotFound(format!(
2999                "entity not found: {}",
3000                id.raw()
3001            )));
3002        };
3003        self.apply_loaded_patch_entity(collection, entity, payload, operations)
3004    }
3005
3006    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3007        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3008        let store = self.db().store();
3009        // Snapshot row fields before delete so we can mirror the removal
3010        // into every secondary index. The fetch is best-effort: if the
3011        // entity is already gone, the delete below is a no-op anyway.
3012        let pre_delete_fields = store
3013            .get(&input.collection, input.id)
3014            .as_ref()
3015            .map(entity_row_fields_snapshot)
3016            .unwrap_or_default();
3017        // Store delete first (source of truth). Crash between here and index removal
3018        // leaves the entity invisible to most queries but recoverable; the reverse
3019        // (remove index first, then crash) leaves the entity permanently orphaned.
3020        let deleted = store
3021            .delete(&input.collection, input.id)
3022            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3023        if deleted {
3024            store.context_index().remove_entity(input.id);
3025            // Secondary index maintenance — surface only registry-shape
3026            // errors; missing-index removals are tolerated inside the call.
3027            if !pre_delete_fields.is_empty() {
3028                self.index_store_ref()
3029                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3030                    .map_err(crate::RedDBError::Internal)?;
3031            }
3032            self.cdc_emit(
3033                crate::replication::cdc::ChangeOperation::Delete,
3034                &input.collection,
3035                input.id.raw(),
3036                "entity",
3037            );
3038        }
3039        Ok(DeleteEntityOutput {
3040            deleted,
3041            id: input.id,
3042        })
3043    }
3044}