Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        vector_dimension: None,
112        vector_metric: None,
113        context_index_fields: Vec::new(),
114        declared_columns: Vec::new(),
115        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117        timestamps_enabled: false,
118        context_index_enabled: false,
119        metrics_raw_retention_ms: None,
120        metrics_rollup_policies: Vec::new(),
121        metrics_tenant_identity: None,
122        metrics_namespace: None,
123        // Implicit contracts are created on first write — mutability
124        // is the default until the operator runs explicit DDL.
125        append_only: false,
126        subscriptions: Vec::new(),
127        session_key: None,
128        session_gap_ms: None,
129        retention_duration_ms: None,
130    })
131    .map(|_| ())
132    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
133}
134
135fn implicit_contract_unix_ms() -> u128 {
136    std::time::SystemTime::now()
137        .duration_since(std::time::UNIX_EPOCH)
138        .unwrap_or_default()
139        .as_millis()
140}
141
142fn collection_model_allows(
143    declared_model: crate::catalog::CollectionModel,
144    requested_model: crate::catalog::CollectionModel,
145) -> bool {
146    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
147}
148
149fn ensure_vector_dimension_contract(
150    db: &crate::storage::unified::devx::RedDB,
151    collection: &str,
152    actual_dimension: usize,
153) -> RedDBResult<()> {
154    let Some(expected_dimension) = db
155        .collection_contract(collection)
156        .and_then(|contract| contract.vector_dimension)
157    else {
158        return Ok(());
159    };
160    if expected_dimension == actual_dimension {
161        return Ok(());
162    }
163    Err(crate::RedDBError::Query(format!(
164        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
165    )))
166}
167
168fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
169    match model {
170        crate::catalog::CollectionModel::Table => "table",
171        crate::catalog::CollectionModel::Document => "document",
172        crate::catalog::CollectionModel::Graph => "graph",
173        crate::catalog::CollectionModel::Vector => "vector",
174        crate::catalog::CollectionModel::Hll => "hll",
175        crate::catalog::CollectionModel::Sketch => "sketch",
176        crate::catalog::CollectionModel::Filter => "filter",
177        crate::catalog::CollectionModel::Kv => "kv",
178        crate::catalog::CollectionModel::Config => "config",
179        crate::catalog::CollectionModel::Vault => "vault",
180        crate::catalog::CollectionModel::Mixed => "mixed",
181        crate::catalog::CollectionModel::TimeSeries => "timeseries",
182        crate::catalog::CollectionModel::Queue => "queue",
183        crate::catalog::CollectionModel::Metrics => "metrics",
184    }
185}
186
187#[derive(Clone)]
188struct UniquenessRule {
189    name: String,
190    columns: Vec<String>,
191    primary_key: bool,
192}
193
194#[derive(Copy, Clone, PartialEq, Eq)]
195enum NormalizeMode {
196    /// First write for this row. Timestamps auto-filled from now on
197    /// both `created_at` and `updated_at`; user attempts to set
198    /// either column are rejected.
199    Insert,
200    /// Update/patch path. `created_at` is preserved from the existing
201    /// row (immutable after insert); `updated_at` is bumped to now.
202    /// User attempts to set either via the patch are rejected.
203    Update,
204}
205
206mod collection_contract_enforcement {
207    use super::*;
208
209    pub(super) struct CollectionContractWriteEnforcer<'a> {
210        db: &'a crate::storage::unified::devx::RedDB,
211        collection: &'a str,
212    }
213
214    impl<'a> CollectionContractWriteEnforcer<'a> {
215        pub(super) fn new(
216            db: &'a crate::storage::unified::devx::RedDB,
217            collection: &'a str,
218        ) -> Self {
219            Self { db, collection }
220        }
221
222        pub(super) fn ensure_model(
223            &self,
224            requested_model: crate::catalog::CollectionModel,
225        ) -> RedDBResult<()> {
226            ensure_collection_model_contract(self.db, self.collection, requested_model)
227        }
228
229        pub(super) fn normalize_insert_fields(
230            &self,
231            fields: Vec<(String, Value)>,
232        ) -> RedDBResult<Vec<(String, Value)>> {
233            normalize_row_fields_for_contract_with_mode(
234                self.db,
235                self.collection,
236                fields,
237                NormalizeMode::Insert,
238            )
239        }
240
241        pub(super) fn normalize_update_fields(
242            &self,
243            fields: Vec<(String, Value)>,
244        ) -> RedDBResult<Vec<(String, Value)>> {
245            normalize_row_fields_for_contract_with_mode(
246                self.db,
247                self.collection,
248                fields,
249                NormalizeMode::Update,
250            )
251        }
252
253        pub(super) fn enforce_row_uniqueness(
254            &self,
255            fields: &[(String, Value)],
256            exclude_id: Option<crate::storage::EntityId>,
257        ) -> RedDBResult<()> {
258            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
259        }
260
261        pub(super) fn enforce_batch_uniqueness(
262            &self,
263            rows: &[Vec<(String, Value)>],
264        ) -> RedDBResult<()> {
265            enforce_row_batch_uniqueness(self.db, self.collection, rows)
266        }
267
268        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
269            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
270        }
271    }
272}
273
274use collection_contract_enforcement::CollectionContractWriteEnforcer;
275
276fn normalize_row_fields_for_contract_with_mode(
277    db: &crate::storage::unified::devx::RedDB,
278    collection: &str,
279    fields: Vec<(String, Value)>,
280    mode: NormalizeMode,
281) -> RedDBResult<Vec<(String, Value)>> {
282    let Some(contract) = db.collection_contract(collection) else {
283        return Ok(fields);
284    };
285
286    if contract.declared_model != crate::catalog::CollectionModel::Table
287        || (contract.declared_columns.is_empty()
288            && contract
289                .table_def
290                .as_ref()
291                .map(|table| table.columns.is_empty())
292                .unwrap_or(true))
293    {
294        return Ok(fields);
295    }
296
297    // Capture the pre-normalize value of created_at (if present) so
298    // Update mode can preserve it. Also capture updated_at to detect
299    // user attempts to set it via the patch payload.
300    //
301    // Heuristic for Update mode: if fields ALREADY contains a
302    // `created_at` whose value matches the row's on-disk entity, the
303    // caller is the patch pipeline carrying forward an auto-populated
304    // column — not a user mutation. Allow pass-through in that case,
305    // then restore the original value at the end.
306    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
307        fields
308            .iter()
309            .find(|(n, _)| n == "created_at")
310            .map(|(_, v)| v.clone())
311    } else {
312        None
313    };
314
315    // Reject user attempts to set runtime-managed timestamp columns.
316    // On Insert we reject any mention; on Update we only reject when
317    // the patch pipeline handed us a NEW value (not the one we
318    // auto-populated during the last insert).
319    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
320        for (name, _) in &fields {
321            if name == "created_at" || name == "updated_at" {
322                return Err(crate::RedDBError::Query(format!(
323                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
324                    collection, name
325                )));
326            }
327        }
328    }
329
330    let mut provided = std::collections::BTreeMap::new();
331    for (name, value) in &fields {
332        provided.insert(name.clone(), value.clone());
333    }
334
335    let resolved_columns = resolved_contract_columns(&contract)?;
336    let declared_names: std::collections::BTreeSet<String> = resolved_columns
337        .iter()
338        .map(|column| column.name.clone())
339        .collect();
340    let unknown_fields: Vec<String> = fields
341        .iter()
342        .filter(|(name, _)| !declared_names.contains(name))
343        .map(|(name, _)| name.clone())
344        .collect();
345    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
346        && !unknown_fields.is_empty()
347    {
348        return Err(crate::RedDBError::Query(format!(
349            "collection '{}' is strict and does not allow undeclared fields: {}",
350            collection,
351            unknown_fields.join(", ")
352        )));
353    }
354    let mut normalized = Vec::new();
355    let now_ms = current_unix_ms_u64();
356
357    for column in &resolved_columns {
358        match provided.remove(&column.name) {
359            Some(value) => {
360                // Runtime-managed columns on Update: always overwrite
361                // with the runtime's own value (preserved created_at
362                // or fresh updated_at). User mutations are silently
363                // discarded because we reject them earlier.
364                if contract.timestamps_enabled && mode == NormalizeMode::Update {
365                    match column.name.as_str() {
366                        "created_at" => {
367                            normalized.push((
368                                column.name.clone(),
369                                existing_created_at
370                                    .clone()
371                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
372                            ));
373                            continue;
374                        }
375                        "updated_at" => {
376                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
377                            continue;
378                        }
379                        _ => {}
380                    }
381                }
382                normalized.push((
383                    column.name.clone(),
384                    normalize_contract_value(collection, column, value)?,
385                ));
386            }
387            None => {
388                // Runtime-managed timestamp columns: auto-fill with now
389                // when the contract opted in. Both get the same value on
390                // first insert so callers can order by either.
391                if contract.timestamps_enabled
392                    && (column.name == "created_at" || column.name == "updated_at")
393                {
394                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395                    continue;
396                }
397                if let Some(default) = &column.default {
398                    normalized.push((
399                        column.name.clone(),
400                        coerce_contract_literal(collection, &column.name, column, default)?,
401                    ));
402                } else if column.not_null {
403                    return Err(crate::RedDBError::Query(format!(
404                        "missing required column '{}' for collection '{}'",
405                        column.name, collection
406                    )));
407                }
408            }
409        }
410    }
411
412    for (name, value) in fields {
413        if !declared_names.contains(&name) {
414            normalized.push((name, value));
415        }
416    }
417
418    Ok(normalized)
419}
420
421fn current_unix_ms_u64() -> u64 {
422    std::time::SystemTime::now()
423        .duration_since(std::time::UNIX_EPOCH)
424        .map(|d| d.as_millis() as u64)
425        .unwrap_or(0)
426}
427
428fn enforce_row_uniqueness(
429    db: &crate::storage::unified::devx::RedDB,
430    collection: &str,
431    fields: &[(String, Value)],
432    exclude_id: Option<crate::storage::EntityId>,
433) -> RedDBResult<()> {
434    let Some(contract) = db.collection_contract(collection) else {
435        return Ok(());
436    };
437    if contract.declared_model != crate::catalog::CollectionModel::Table
438        && contract.declared_model != crate::catalog::CollectionModel::Mixed
439    {
440        return Ok(());
441    }
442
443    let rules = resolved_uniqueness_rules(&contract);
444    if rules.is_empty() {
445        return Ok(());
446    }
447
448    let store = db.store();
449    let Some(manager) = store.get_collection(collection) else {
450        return Ok(());
451    };
452
453    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
454
455    for rule in &rules {
456        let mut expected_signatures = Vec::new();
457        let mut skip_rule = false;
458
459        for column in &rule.columns {
460            match input_fields.get(column) {
461                Some(Value::Null) | None if rule.primary_key => {
462                    return Err(crate::RedDBError::Query(format!(
463                        "primary key '{}' in collection '{}' requires non-null column '{}'",
464                        rule.name, collection, column
465                    )))
466                }
467                Some(Value::Null) | None => {
468                    skip_rule = true;
469                    break;
470                }
471                Some(value) => {
472                    expected_signatures.push((column.clone(), value_signature(value)));
473                }
474            }
475        }
476
477        if skip_rule {
478            continue;
479        }
480
481        for entity in manager.query_all(|_| true) {
482            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
483                continue;
484            }
485            let Some(existing_fields) = row_fields_from_entity(&entity) else {
486                continue;
487            };
488
489            let duplicate = expected_signatures.iter().all(|(column, expected)| {
490                existing_fields
491                    .get(column)
492                    .map(|value| value_signature(value) == *expected)
493                    .unwrap_or(false)
494            });
495
496            if duplicate {
497                let qualifier = if rule.primary_key {
498                    "primary key"
499                } else {
500                    "unique constraint"
501                };
502                return Err(crate::RedDBError::Query(format!(
503                    "{} '{}' violated on collection '{}' for columns [{}]",
504                    qualifier,
505                    rule.name,
506                    collection,
507                    rule.columns.join(", ")
508                )));
509            }
510        }
511    }
512
513    Ok(())
514}
515
516fn enforce_row_batch_uniqueness(
517    db: &crate::storage::unified::devx::RedDB,
518    collection: &str,
519    rows: &[Vec<(String, Value)>],
520) -> RedDBResult<()> {
521    let Some(contract) = db.collection_contract(collection) else {
522        return Ok(());
523    };
524    if contract.declared_model != crate::catalog::CollectionModel::Table
525        && contract.declared_model != crate::catalog::CollectionModel::Mixed
526    {
527        return Ok(());
528    }
529
530    let rules = resolved_uniqueness_rules(&contract);
531    if rules.is_empty() {
532        return Ok(());
533    }
534
535    for rule in &rules {
536        let mut seen = std::collections::HashMap::<String, usize>::new();
537        for (row_index, fields) in rows.iter().enumerate() {
538            let input_fields: std::collections::BTreeMap<String, Value> =
539                fields.iter().cloned().collect();
540            let mut signatures = Vec::new();
541            let mut skip_rule = false;
542
543            for column in &rule.columns {
544                match input_fields.get(column) {
545                    Some(Value::Null) | None if rule.primary_key => {
546                        return Err(crate::RedDBError::Query(format!(
547                            "primary key '{}' in collection '{}' requires non-null column '{}'",
548                            rule.name, collection, column
549                        )))
550                    }
551                    Some(Value::Null) | None => {
552                        skip_rule = true;
553                        break;
554                    }
555                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
556                }
557            }
558
559            if skip_rule {
560                continue;
561            }
562
563            let signature = signatures.join("|");
564            if let Some(previous_index) = seen.insert(signature, row_index) {
565                return Err(crate::RedDBError::Query(format!(
566                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
567                    rule.name,
568                    collection,
569                    previous_index + 1,
570                    row_index + 1
571                )));
572            }
573        }
574    }
575
576    Ok(())
577}
578
579fn row_update_requires_uniqueness_check(
580    db: &crate::storage::unified::devx::RedDB,
581    collection: &str,
582    modified_columns: &[String],
583) -> bool {
584    if modified_columns.is_empty() {
585        return false;
586    }
587
588    let Some(contract) = db.collection_contract(collection) else {
589        return false;
590    };
591    if contract.declared_model != crate::catalog::CollectionModel::Table
592        && contract.declared_model != crate::catalog::CollectionModel::Mixed
593    {
594        return false;
595    }
596
597    let rules = resolved_uniqueness_rules(&contract);
598    if rules.is_empty() {
599        return false;
600    }
601
602    rules.iter().any(|rule| {
603        rule.columns.iter().any(|column| {
604            modified_columns
605                .iter()
606                .any(|modified| modified.eq_ignore_ascii_case(column))
607        })
608    })
609}
610
611pub(crate) fn build_row_update_contract_plan(
612    db: &crate::storage::unified::devx::RedDB,
613    collection: &str,
614) -> RedDBResult<Option<RowUpdateContractPlan>> {
615    let Some(contract) = db.collection_contract(collection) else {
616        return Ok(None);
617    };
618
619    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
620        && !(contract.declared_columns.is_empty()
621            && contract
622                .table_def
623                .as_ref()
624                .map(|table| table.columns.is_empty())
625                .unwrap_or(true))
626    {
627        resolved_contract_columns(&contract)?
628            .into_iter()
629            .map(|rule| {
630                (
631                    rule.name.clone(),
632                    RowUpdateColumnRule {
633                        name: rule.name,
634                        data_type: rule.data_type,
635                        data_type_name: rule.data_type_name,
636                        not_null: rule.not_null,
637                        enum_variants: rule.enum_variants,
638                    },
639                )
640            })
641            .collect()
642    } else {
643        HashMap::new()
644    };
645
646    let unique_columns = if matches!(
647        contract.declared_model,
648        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
649    ) {
650        resolved_uniqueness_rules(&contract)
651            .into_iter()
652            .flat_map(|rule| rule.columns.into_iter())
653            .map(|column| (column, ()))
654            .collect()
655    } else {
656        HashMap::new()
657    };
658
659    Ok(Some(RowUpdateContractPlan {
660        timestamps_enabled: contract.timestamps_enabled,
661        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
662        declared_rules,
663        unique_columns,
664    }))
665}
666
667pub(crate) fn normalize_row_update_assignment_with_plan(
668    collection: &str,
669    column: &str,
670    value: Value,
671    row_contract_plan: Option<&RowUpdateContractPlan>,
672) -> RedDBResult<Value> {
673    let Some(plan) = row_contract_plan else {
674        return Ok(value);
675    };
676
677    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
678        return Err(crate::RedDBError::Query(format!(
679            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
680            collection, column
681        )));
682    }
683
684    if let Some(rule) = plan.declared_rules.get(column) {
685        let rule = ResolvedColumnRule {
686            name: rule.name.clone(),
687            data_type: rule.data_type,
688            data_type_name: rule.data_type_name.clone(),
689            not_null: rule.not_null,
690            default: None,
691            enum_variants: rule.enum_variants.clone(),
692        };
693        normalize_contract_value(collection, &rule, value)
694    } else if plan.strict_schema {
695        Err(crate::RedDBError::Query(format!(
696            "collection '{}' is strict and does not allow undeclared fields: {}",
697            collection, column
698        )))
699    } else {
700        Ok(value)
701    }
702}
703
704pub(crate) fn normalize_row_update_value_for_rule(
705    collection: &str,
706    value: Value,
707    row_rule: Option<&RowUpdateColumnRule>,
708) -> RedDBResult<Value> {
709    let Some(rule) = row_rule else {
710        return Ok(value);
711    };
712
713    let rule = ResolvedColumnRule {
714        name: rule.name.clone(),
715        data_type: rule.data_type,
716        data_type_name: rule.data_type_name.clone(),
717        not_null: rule.not_null,
718        default: None,
719        enum_variants: rule.enum_variants.clone(),
720    };
721    normalize_contract_value(collection, &rule, value)
722}
723
724fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
725    if let Some(named) = row.named.as_mut() {
726        named.insert(name.to_string(), value);
727        return;
728    }
729
730    if let Some(schema) = row.schema.as_ref() {
731        if let Some(index) = schema.iter().position(|column| column == name) {
732            if let Some(slot) = row.columns.get_mut(index) {
733                *slot = value;
734                return;
735            }
736        }
737
738        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
739        for (column, current) in schema.iter().zip(row.columns.iter()) {
740            named.insert(column.clone(), current.clone());
741        }
742        named.insert(name.to_string(), value);
743        row.named = Some(named);
744        return;
745    }
746
747    let mut named = HashMap::with_capacity(1);
748    named.insert(name.to_string(), value);
749    row.named = Some(named);
750}
751
752fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
753    if let Some(named) = row.named.as_ref() {
754        named
755            .iter()
756            .map(|(key, value)| (key.clone(), value.clone()))
757            .collect()
758    } else if let Some(schema) = row.schema.as_ref() {
759        schema
760            .iter()
761            .cloned()
762            .zip(row.columns.iter().cloned())
763            .collect()
764    } else {
765        Vec::new()
766    }
767}
768
769fn apply_row_field_assignments_raw<I>(
770    row: &mut crate::storage::unified::entity::RowData,
771    field_assignments: I,
772) where
773    I: IntoIterator<Item = (String, Value)>,
774{
775    for (column, value) in field_assignments {
776        set_row_field(row, &column, value);
777    }
778}
779
780fn apply_row_field_assignments_incremental<I>(
781    collection: &str,
782    row: &mut crate::storage::unified::entity::RowData,
783    field_assignments: I,
784    row_contract_plan: Option<&RowUpdateContractPlan>,
785) -> RedDBResult<()>
786where
787    I: IntoIterator<Item = (String, Value)>,
788{
789    for (column, value) in field_assignments {
790        let value = normalize_row_update_assignment_with_plan(
791            collection,
792            &column,
793            value,
794            row_contract_plan,
795        )?;
796
797        set_row_field(row, &column, value);
798    }
799
800    Ok(())
801}
802
803fn resolved_uniqueness_rules(
804    contract: &crate::physical::CollectionContract,
805) -> Vec<UniquenessRule> {
806    let mut rules = Vec::new();
807
808    if let Some(table_def) = &contract.table_def {
809        if !table_def.primary_key.is_empty() {
810            rules.push(UniquenessRule {
811                name: "primary_key".to_string(),
812                columns: table_def.primary_key.clone(),
813                primary_key: true,
814            });
815        }
816
817        for constraint in &table_def.constraints {
818            if matches!(
819                constraint.constraint_type,
820                crate::storage::schema::ConstraintType::PrimaryKey
821            ) && !constraint.columns.is_empty()
822            {
823                rules.push(UniquenessRule {
824                    name: constraint.name.clone(),
825                    columns: constraint.columns.clone(),
826                    primary_key: true,
827                });
828            } else if matches!(
829                constraint.constraint_type,
830                crate::storage::schema::ConstraintType::Unique
831            ) && !constraint.columns.is_empty()
832            {
833                rules.push(UniquenessRule {
834                    name: constraint.name.clone(),
835                    columns: constraint.columns.clone(),
836                    primary_key: false,
837                });
838            }
839        }
840    } else {
841        for column in &contract.declared_columns {
842            if column.primary_key {
843                rules.push(UniquenessRule {
844                    name: format!("pk_{}", column.name),
845                    columns: vec![column.name.clone()],
846                    primary_key: true,
847                });
848            } else if column.unique {
849                rules.push(UniquenessRule {
850                    name: format!("uniq_{}", column.name),
851                    columns: vec![column.name.clone()],
852                    primary_key: false,
853                });
854            }
855        }
856    }
857
858    let mut dedup = std::collections::BTreeSet::new();
859    rules
860        .into_iter()
861        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
862        .collect()
863}
864
865fn row_fields_from_entity(
866    entity: &crate::storage::UnifiedEntity,
867) -> Option<std::collections::BTreeMap<String, Value>> {
868    match &entity.data {
869        crate::storage::EntityData::Row(row) => {
870            if let Some(named) = &row.named {
871                Some(
872                    named
873                        .iter()
874                        .map(|(key, value)| (key.clone(), value.clone()))
875                        .collect(),
876                )
877            } else {
878                row.schema.as_ref().map(|schema| {
879                    schema
880                        .iter()
881                        .cloned()
882                        .zip(row.columns.iter().cloned())
883                        .collect()
884                })
885            }
886        }
887        _ => None,
888    }
889}
890
891fn value_signature(value: &Value) -> String {
892    format!("{value:?}")
893}
894
895fn normalize_contract_value(
896    collection: &str,
897    column: &ResolvedColumnRule,
898    value: Value,
899) -> RedDBResult<Value> {
900    if matches!(value, Value::Null) {
901        if column.not_null {
902            return Err(crate::RedDBError::Query(format!(
903                "column '{}' in collection '{}' cannot be null",
904                column.name, collection
905            )));
906        }
907        return Ok(Value::Null);
908    }
909
910    let target = column.data_type;
911    if value_matches_declared_type(&value, target) {
912        return Ok(value);
913    }
914
915    let Some(raw) = value_to_coercion_input(&value) else {
916        return Err(crate::RedDBError::Query(format!(
917            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
918            column.name, collection, column.data_type_name
919        )));
920    };
921
922    coerce_contract_literal(collection, &column.name, column, &raw)
923}
924
925fn coerce_contract_literal(
926    collection: &str,
927    column_name: &str,
928    column: &ResolvedColumnRule,
929    raw: &str,
930) -> RedDBResult<Value> {
931    let target = column.data_type;
932    match target {
933        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
934        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
935        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
936            crate::RedDBError::Query(format!(
937                "failed to coerce column '{}' in collection '{}' to '{}': {}",
938                column_name, collection, column.data_type_name, err
939            ))
940        }),
941        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
942            crate::RedDBError::Query(format!(
943                "failed to coerce column '{}' in collection '{}' to '{}': {}",
944                column_name, collection, column.data_type_name, err
945            ))
946        }),
947        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
948            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
949            column_name, collection, column.data_type_name
950        ))),
951        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
952            |err| {
953                crate::RedDBError::Query(format!(
954                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
955                    column_name, collection, column.data_type_name, err
956                ))
957            },
958        ),
959    }
960}
961
962struct ResolvedColumnRule {
963    name: String,
964    data_type: DataType,
965    data_type_name: String,
966    not_null: bool,
967    default: Option<String>,
968    enum_variants: Vec<String>,
969}
970
971fn resolved_contract_columns(
972    contract: &crate::physical::CollectionContract,
973) -> RedDBResult<Vec<ResolvedColumnRule>> {
974    if let Some(table_def) = &contract.table_def {
975        return Ok(table_def
976            .columns
977            .iter()
978            .map(|column| ResolvedColumnRule {
979                name: column.name.clone(),
980                data_type: column.data_type,
981                data_type_name: data_type_name(column.data_type).to_string(),
982                not_null: !column.nullable,
983                default: column
984                    .default
985                    .as_ref()
986                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
987                enum_variants: column.enum_variants.clone(),
988            })
989            .collect());
990    }
991
992    contract
993        .declared_columns
994        .iter()
995        .map(|column| {
996            let data_type = column
997                .sql_type
998                .as_ref()
999                .map(crate::storage::query::resolve_sql_type_name)
1000                .transpose()
1001                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1002                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1003            Ok(ResolvedColumnRule {
1004                name: column.name.clone(),
1005                data_type,
1006                data_type_name: column.data_type.clone(),
1007                not_null: column.not_null,
1008                default: column.default.clone(),
1009                enum_variants: column.enum_variants.clone(),
1010            })
1011        })
1012        .collect()
1013}
1014
1015fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1016    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1017}
1018
1019fn data_type_name(data_type: DataType) -> &'static str {
1020    match data_type {
1021        DataType::Integer => "integer",
1022        DataType::UnsignedInteger => "unsigned_integer",
1023        DataType::Float => "float",
1024        DataType::Text => "text",
1025        DataType::Blob => "blob",
1026        DataType::Boolean => "boolean",
1027        DataType::Timestamp => "timestamp",
1028        DataType::Duration => "duration",
1029        DataType::IpAddr => "ipaddr",
1030        DataType::MacAddr => "macaddr",
1031        DataType::Vector => "vector",
1032        DataType::Nullable => "nullable",
1033        DataType::Unknown => "unknown",
1034        DataType::Json => "json",
1035        DataType::Uuid => "uuid",
1036        DataType::NodeRef => "noderef",
1037        DataType::EdgeRef => "edgeref",
1038        DataType::VectorRef => "vectorref",
1039        DataType::RowRef => "rowref",
1040        DataType::Color => "color",
1041        DataType::Email => "email",
1042        DataType::Url => "url",
1043        DataType::Phone => "phone",
1044        DataType::Semver => "semver",
1045        DataType::Cidr => "cidr",
1046        DataType::Date => "date",
1047        DataType::Time => "time",
1048        DataType::Decimal => "decimal",
1049        DataType::Enum => "enum",
1050        DataType::Array => "array",
1051        DataType::TimestampMs => "timestamp_ms",
1052        DataType::Ipv4 => "ipv4",
1053        DataType::Ipv6 => "ipv6",
1054        DataType::Subnet => "subnet",
1055        DataType::Port => "port",
1056        DataType::Latitude => "latitude",
1057        DataType::Longitude => "longitude",
1058        DataType::GeoPoint => "geopoint",
1059        DataType::Country2 => "country2",
1060        DataType::Country3 => "country3",
1061        DataType::Lang2 => "lang2",
1062        DataType::Lang5 => "lang5",
1063        DataType::Currency => "currency",
1064        DataType::AssetCode => "asset_code",
1065        DataType::Money => "money",
1066        DataType::ColorAlpha => "color_alpha",
1067        DataType::BigInt => "bigint",
1068        DataType::KeyRef => "keyref",
1069        DataType::DocRef => "docref",
1070        DataType::TableRef => "tableref",
1071        DataType::PageRef => "pageref",
1072        DataType::Secret => "secret",
1073        DataType::Password => "password",
1074        DataType::TextZstd => "text",
1075        DataType::BlobZstd => "blob",
1076    }
1077}
1078
1079fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1080    matches!(
1081        (value, target),
1082        (Value::Null, _)
1083            | (Value::Integer(_), DataType::Integer)
1084            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1085            | (Value::Float(_), DataType::Float)
1086            | (Value::Text(_), DataType::Text)
1087            | (Value::Blob(_), DataType::Blob)
1088            | (Value::Boolean(_), DataType::Boolean)
1089            | (Value::Timestamp(_), DataType::Timestamp)
1090            | (Value::Duration(_), DataType::Duration)
1091            | (Value::IpAddr(_), DataType::IpAddr)
1092            | (Value::MacAddr(_), DataType::MacAddr)
1093            | (Value::Vector(_), DataType::Vector)
1094            | (Value::Json(_), DataType::Json)
1095            | (Value::Uuid(_), DataType::Uuid)
1096            | (Value::NodeRef(_), DataType::NodeRef)
1097            | (Value::EdgeRef(_), DataType::EdgeRef)
1098            | (Value::VectorRef(_, _), DataType::VectorRef)
1099            | (Value::RowRef(_, _), DataType::RowRef)
1100            | (Value::Color(_), DataType::Color)
1101            | (Value::Email(_), DataType::Email)
1102            | (Value::Url(_), DataType::Url)
1103            | (Value::Phone(_), DataType::Phone)
1104            | (Value::Semver(_), DataType::Semver)
1105            | (Value::Cidr(_, _), DataType::Cidr)
1106            | (Value::Date(_), DataType::Date)
1107            | (Value::Time(_), DataType::Time)
1108            | (Value::Decimal(_), DataType::Decimal)
1109            | (Value::EnumValue(_), DataType::Enum)
1110            | (Value::Array(_), DataType::Array)
1111            | (Value::TimestampMs(_), DataType::TimestampMs)
1112            | (Value::Ipv4(_), DataType::Ipv4)
1113            | (Value::Ipv6(_), DataType::Ipv6)
1114            | (Value::Subnet(_, _), DataType::Subnet)
1115            | (Value::Port(_), DataType::Port)
1116            | (Value::Latitude(_), DataType::Latitude)
1117            | (Value::Longitude(_), DataType::Longitude)
1118            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1119            | (Value::Country2(_), DataType::Country2)
1120            | (Value::Country3(_), DataType::Country3)
1121            | (Value::Lang2(_), DataType::Lang2)
1122            | (Value::Lang5(_), DataType::Lang5)
1123            | (Value::Currency(_), DataType::Currency)
1124            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1125            | (Value::BigInt(_), DataType::BigInt)
1126            | (Value::KeyRef(_, _), DataType::KeyRef)
1127            | (Value::DocRef(_, _), DataType::DocRef)
1128            | (Value::TableRef(_), DataType::TableRef)
1129            | (Value::PageRef(_), DataType::PageRef)
1130            | (Value::Secret(_), DataType::Secret)
1131            | (Value::Password(_), DataType::Password)
1132    )
1133}
1134
1135fn value_to_coercion_input(value: &Value) -> Option<String> {
1136    match value {
1137        Value::Null => None,
1138        Value::Integer(value) => Some(value.to_string()),
1139        Value::UnsignedInteger(value) => Some(value.to_string()),
1140        Value::Float(value) => Some(value.to_string()),
1141        Value::Text(value) => Some(value.to_string()),
1142        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1143        Value::Boolean(value) => Some(value.to_string()),
1144        Value::Timestamp(value) => Some(value.to_string()),
1145        Value::Duration(value) => Some(value.to_string()),
1146        Value::IpAddr(value) => Some(value.to_string()),
1147        Value::MacAddr(value) => Some(format!(
1148            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1149            value[0], value[1], value[2], value[3], value[4], value[5]
1150        )),
1151        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1152        Value::Email(value) => Some(value.clone()),
1153        Value::Url(value) => Some(value.clone()),
1154        Value::Phone(value) => Some(value.to_string()),
1155        Value::Semver(value) => Some(format!(
1156            "{}.{}.{}",
1157            value / 1_000_000,
1158            (value / 1_000) % 1_000,
1159            value % 1_000
1160        )),
1161        Value::Date(value) => Some(value.to_string()),
1162        Value::Time(value) => Some(value.to_string()),
1163        Value::Decimal(value) => Some(value.to_string()),
1164        Value::TimestampMs(value) => Some(value.to_string()),
1165        Value::Ipv4(value) => Some(format!(
1166            "{}.{}.{}.{}",
1167            (value >> 24) & 0xFF,
1168            (value >> 16) & 0xFF,
1169            (value >> 8) & 0xFF,
1170            value & 0xFF
1171        )),
1172        Value::Port(value) => Some(value.to_string()),
1173        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1174        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1175        Value::GeoPoint(lat, lon) => Some(format!(
1176            "{},{}",
1177            *lat as f64 / 1_000_000.0,
1178            *lon as f64 / 1_000_000.0
1179        )),
1180        Value::BigInt(value) => Some(value.to_string()),
1181        Value::TableRef(value) => Some(value.clone()),
1182        Value::PageRef(value) => Some(value.to_string()),
1183        Value::Password(value) => Some(value.clone()),
1184        _ => None,
1185    }
1186}
1187
1188fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1189    if modified_columns.is_empty() {
1190        return modified_columns;
1191    }
1192
1193    let mut unique = Vec::with_capacity(modified_columns.len());
1194    for column in modified_columns.drain(..) {
1195        if !unique
1196            .iter()
1197            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1198        {
1199            unique.push(column);
1200        }
1201    }
1202    unique
1203}
1204
1205fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1206    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1207        return Err(crate::RedDBError::Query(
1208            "array positional document patch paths are unsupported; replace the array or full document body instead"
1209                .to_string(),
1210        ));
1211    }
1212    Ok(())
1213}
1214
1215fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1216    match fields.get("body") {
1217        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1218            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1219        }),
1220        Some(_) => Err(crate::RedDBError::Query(
1221            "document body field must contain JSON".to_string(),
1222        )),
1223        None => Ok(JsonValue::Object(Default::default())),
1224    }
1225}
1226
1227fn replace_document_row_body(
1228    fields: &mut HashMap<String, Value>,
1229    body: JsonValue,
1230    modified_columns: &mut Vec<String>,
1231) -> RedDBResult<()> {
1232    modified_columns.push("body".to_string());
1233
1234    let old_keys: Vec<String> = fields
1235        .keys()
1236        .filter(|key| key.as_str() != "body")
1237        .cloned()
1238        .collect();
1239    for key in old_keys {
1240        fields.remove(&key);
1241        modified_columns.push(key);
1242    }
1243
1244    let body_bytes = json_to_vec(&body).map_err(|err| {
1245        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1246    })?;
1247    fields.insert("body".to_string(), Value::Json(body_bytes));
1248
1249    if let JsonValue::Object(map) = &body {
1250        for (key, value) in map {
1251            fields.insert(key.clone(), json_to_storage_value(value)?);
1252            modified_columns.push(key.clone());
1253        }
1254    }
1255
1256    Ok(())
1257}
1258
1259impl RedDBRuntime {
1260    pub(crate) fn apply_loaded_patch_entity_core(
1261        &self,
1262        collection: String,
1263        mut entity: crate::storage::UnifiedEntity,
1264        payload: JsonValue,
1265        operations: Vec<PatchEntityOperation>,
1266    ) -> RedDBResult<AppliedEntityMutation> {
1267        let id = entity.id;
1268        let operations = normalize_ttl_patch_operations(operations)?;
1269        // Snapshot pre-patch row fields for the secondary-index hook —
1270        // empty for non-row entities, which is the desired no-op.
1271        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1272
1273        let db = self.db();
1274        let store = db.store();
1275        let Some(manager) = store.get_collection(&collection) else {
1276            return Err(crate::RedDBError::NotFound(format!(
1277                "collection not found: {collection}"
1278            )));
1279        };
1280
1281        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1282        let mut metadata_changed = false;
1283        let mut modified_columns: Vec<String> = Vec::new();
1284        let mut context_index_dirty = false;
1285        let mut graph_node_type: Option<String> = None;
1286        let mut graph_edge_weight: Option<f32> = None;
1287
1288        let row_contract_timestamps = db
1289            .collection_contract(&collection)
1290            .map(|c| c.timestamps_enabled)
1291            .unwrap_or(false);
1292
1293        match &mut entity.data {
1294            crate::storage::EntityData::Row(row) => {
1295                let is_document_collection = db
1296                    .collection_contract(&collection)
1297                    .map(|contract| {
1298                        contract.declared_model == crate::catalog::CollectionModel::Document
1299                    })
1300                    .unwrap_or(false);
1301                let mut field_ops = Vec::new();
1302                let mut metadata_ops = Vec::new();
1303                let mut document_body_ops = Vec::new();
1304
1305                for mut op in operations {
1306                    let Some(root) = op.path.first().map(String::as_str) else {
1307                        return Err(crate::RedDBError::Query(
1308                            "patch path cannot be empty".to_string(),
1309                        ));
1310                    };
1311
1312                    match root {
1313                        "body" if is_document_collection => {
1314                            if op.path.len() < 2 {
1315                                return Err(crate::RedDBError::Query(
1316                                    "document body patch paths require a nested key; use payload.body for full replacement"
1317                                        .to_string(),
1318                                ));
1319                            }
1320                            op.path.remove(0);
1321                            reject_document_array_position_path(&op.path)?;
1322                            document_body_ops.push(op);
1323                        }
1324                        "fields" | "named" => {
1325                            if op.path.len() < 2 {
1326                                return Err(crate::RedDBError::Query(
1327                                    "patch path 'fields' requires a nested key".to_string(),
1328                                ));
1329                            }
1330                            if row_contract_timestamps {
1331                                let leaf = op.path.get(1).map(String::as_str);
1332                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1333                                    return Err(crate::RedDBError::Query(format!(
1334                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1335                                        collection,
1336                                        leaf.unwrap_or("")
1337                                    )));
1338                                }
1339                            }
1340                            op.path.remove(0);
1341                            field_ops.push(op);
1342                        }
1343                        "metadata" => {
1344                            if op.path.len() < 2 {
1345                                return Err(crate::RedDBError::Query(
1346                                    "patch path 'metadata' requires a nested key".to_string(),
1347                                ));
1348                            }
1349                            op.path.remove(0);
1350                            metadata_ops.push(op);
1351                        }
1352                        _ => {
1353                            return Err(crate::RedDBError::Query(format!(
1354                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1355                            )));
1356                        }
1357                    }
1358                }
1359
1360                if !document_body_ops.is_empty() {
1361                    context_index_dirty = true;
1362                    let named = row.named.get_or_insert_with(Default::default);
1363                    let mut body = document_body_from_named(named)?;
1364                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1365                        .map_err(crate::RedDBError::Query)?;
1366                    replace_document_row_body(named, body, &mut modified_columns)?;
1367                }
1368
1369                if is_document_collection {
1370                    if let Some(body) = payload.get("body") {
1371                        context_index_dirty = true;
1372                        let named = row.named.get_or_insert_with(Default::default);
1373                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1374                    }
1375                }
1376
1377                if !field_ops.is_empty() {
1378                    context_index_dirty = true;
1379                    for op in &field_ops {
1380                        if let Some(col) = op.path.first() {
1381                            modified_columns.push(col.clone());
1382                        }
1383                    }
1384                    let named = row.named.get_or_insert_with(Default::default);
1385                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1386                }
1387
1388                if let Some(fields) = payload
1389                    .get("fields")
1390                    .and_then(crate::json::Value::as_object)
1391                {
1392                    if row_contract_timestamps {
1393                        for key in fields.keys() {
1394                            if key == "created_at" || key == "updated_at" {
1395                                return Err(crate::RedDBError::Query(format!(
1396                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1397                                    collection, key
1398                                )));
1399                            }
1400                        }
1401                    }
1402                    context_index_dirty = true;
1403                    let named = row.named.get_or_insert_with(Default::default);
1404                    for (key, value) in fields {
1405                        modified_columns.push(key.clone());
1406                        named.insert(key.clone(), json_to_storage_value(value)?);
1407                    }
1408                }
1409
1410                if !metadata_ops.is_empty() {
1411                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1412                    let metadata = patch_metadata.get_or_insert_with(|| {
1413                        store.get_metadata(&collection, id).unwrap_or_default()
1414                    });
1415                    let mut metadata_json = metadata_to_json(metadata);
1416                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1417                        .map_err(crate::RedDBError::Query)?;
1418                    *metadata = metadata_from_json(&metadata_json)?;
1419                    metadata_changed = true;
1420                }
1421
1422                if !modified_columns.is_empty() || row_contract_timestamps {
1423                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1424                    let current_fields = if let Some(named) = row.named.take() {
1425                        named.into_iter().collect::<Vec<_>>()
1426                    } else if let Some(schema) = row.schema.as_ref() {
1427                        schema
1428                            .iter()
1429                            .cloned()
1430                            .zip(row.columns.iter().cloned())
1431                            .collect::<Vec<_>>()
1432                    } else {
1433                        Vec::new()
1434                    };
1435                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1436                    if row_contract_timestamps {
1437                        modified_columns.push("updated_at".to_string());
1438                        context_index_dirty = true;
1439                    }
1440                    if contract.requires_uniqueness_check(&modified_columns) {
1441                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1442                    }
1443                    row.named = Some(normalized_fields.into_iter().collect());
1444                }
1445            }
1446            crate::storage::EntityData::Node(node) => {
1447                let mut field_ops = Vec::new();
1448                let mut metadata_ops = Vec::new();
1449                let mut node_type_ops = Vec::new();
1450
1451                for mut op in operations {
1452                    let Some(root) = op.path.first().map(String::as_str) else {
1453                        return Err(crate::RedDBError::Query(
1454                            "patch path cannot be empty".to_string(),
1455                        ));
1456                    };
1457
1458                    match root {
1459                        "fields" | "properties" => {
1460                            if op.path.len() < 2 {
1461                                return Err(crate::RedDBError::Query(
1462                                    "patch path 'fields' requires a nested key".to_string(),
1463                                ));
1464                            }
1465                            op.path.remove(0);
1466                            field_ops.push(op);
1467                        }
1468                        "metadata" => {
1469                            if op.path.len() < 2 {
1470                                return Err(crate::RedDBError::Query(
1471                                    "patch path 'metadata' requires a nested key".to_string(),
1472                                ));
1473                            }
1474                            op.path.remove(0);
1475                            metadata_ops.push(op);
1476                        }
1477                        "node_type" => {
1478                            if op.path.len() != 1 {
1479                                return Err(crate::RedDBError::Query(
1480                                    "patch path 'node_type' does not allow nested keys".to_string(),
1481                                ));
1482                            }
1483                            op.path.clear();
1484                            node_type_ops.push(op);
1485                        }
1486                        _ => {
1487                            return Err(crate::RedDBError::Query(format!(
1488                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1489                            )));
1490                        }
1491                    }
1492                }
1493
1494                for op in node_type_ops {
1495                    context_index_dirty = true;
1496                    let value = op.value.ok_or_else(|| {
1497                        crate::RedDBError::Query("node_type operations require a value".to_string())
1498                    })?;
1499
1500                    match op.op {
1501                        PatchEntityOperationType::Unset => {
1502                            return Err(crate::RedDBError::Query(
1503                                "node_type cannot be unset through patch operations".to_string(),
1504                            ));
1505                        }
1506                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1507                            let Some(node_type) = value.as_str() else {
1508                                return Err(crate::RedDBError::Query(
1509                                    "node_type operation requires a text value".to_string(),
1510                                ));
1511                            };
1512                            graph_node_type = Some(node_type.to_string());
1513                            modified_columns.push("node_type".to_string());
1514                        }
1515                    }
1516                }
1517
1518                if !field_ops.is_empty() {
1519                    context_index_dirty = true;
1520                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1521                }
1522
1523                if let Some(fields) = payload
1524                    .get("fields")
1525                    .and_then(crate::json::Value::as_object)
1526                {
1527                    context_index_dirty = true;
1528                    for (key, value) in fields {
1529                        node.properties
1530                            .insert(key.clone(), json_to_storage_value(value)?);
1531                    }
1532                }
1533
1534                if !metadata_ops.is_empty() {
1535                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1536                    let metadata = patch_metadata.get_or_insert_with(|| {
1537                        store.get_metadata(&collection, id).unwrap_or_default()
1538                    });
1539                    let mut metadata_json = metadata_to_json(metadata);
1540                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1541                        .map_err(crate::RedDBError::Query)?;
1542                    *metadata = metadata_from_json(&metadata_json)?;
1543                    metadata_changed = true;
1544                }
1545            }
1546            crate::storage::EntityData::Edge(edge) => {
1547                let mut field_ops = Vec::new();
1548                let mut metadata_ops = Vec::new();
1549                let mut weight_ops = Vec::new();
1550
1551                for mut op in operations {
1552                    let Some(root) = op.path.first().map(String::as_str) else {
1553                        return Err(crate::RedDBError::Query(
1554                            "patch path cannot be empty".to_string(),
1555                        ));
1556                    };
1557
1558                    match root {
1559                        "fields" | "properties" => {
1560                            if op.path.len() < 2 {
1561                                return Err(crate::RedDBError::Query(
1562                                    "patch path 'fields' requires a nested key".to_string(),
1563                                ));
1564                            }
1565                            op.path.remove(0);
1566                            field_ops.push(op);
1567                        }
1568                        "weight" => {
1569                            if op.path.len() != 1 {
1570                                return Err(crate::RedDBError::Query(
1571                                    "patch path 'weight' does not allow nested keys".to_string(),
1572                                ));
1573                            }
1574                            op.path.clear();
1575                            weight_ops.push(op);
1576                        }
1577                        "metadata" => {
1578                            if op.path.len() < 2 {
1579                                return Err(crate::RedDBError::Query(
1580                                    "patch path 'metadata' requires a nested key".to_string(),
1581                                ));
1582                            }
1583                            op.path.remove(0);
1584                            metadata_ops.push(op);
1585                        }
1586                        _ => {
1587                            return Err(crate::RedDBError::Query(format!(
1588                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1589                            )));
1590                        }
1591                    }
1592                }
1593
1594                if !field_ops.is_empty() {
1595                    context_index_dirty = true;
1596                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1597                }
1598
1599                for op in weight_ops {
1600                    context_index_dirty = true;
1601                    let value = op.value.ok_or_else(|| {
1602                        crate::RedDBError::Query("weight operations require a value".to_string())
1603                    })?;
1604
1605                    match op.op {
1606                        PatchEntityOperationType::Unset => {
1607                            return Err(crate::RedDBError::Query(
1608                                "weight cannot be unset through patch operations".to_string(),
1609                            ));
1610                        }
1611                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1612                            let Some(weight) = value.as_f64() else {
1613                                return Err(crate::RedDBError::Query(
1614                                    "weight operation requires a numeric value".to_string(),
1615                                ));
1616                            };
1617                            graph_edge_weight = Some(weight as f32);
1618                            edge.weight = weight as f32;
1619                            modified_columns.push("weight".to_string());
1620                        }
1621                    }
1622                }
1623
1624                if let Some(fields) = payload
1625                    .get("fields")
1626                    .and_then(crate::json::Value::as_object)
1627                {
1628                    context_index_dirty = true;
1629                    for (key, value) in fields {
1630                        edge.properties
1631                            .insert(key.clone(), json_to_storage_value(value)?);
1632                    }
1633                }
1634
1635                if !metadata_ops.is_empty() {
1636                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1637                    let metadata = patch_metadata.get_or_insert_with(|| {
1638                        store.get_metadata(&collection, id).unwrap_or_default()
1639                    });
1640                    let mut metadata_json = metadata_to_json(metadata);
1641                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1642                        .map_err(crate::RedDBError::Query)?;
1643                    *metadata = metadata_from_json(&metadata_json)?;
1644                    metadata_changed = true;
1645                }
1646            }
1647            crate::storage::EntityData::Vector(vector) => {
1648                let mut field_ops = Vec::new();
1649                let mut metadata_ops = Vec::new();
1650
1651                for mut op in operations {
1652                    let Some(root) = op.path.first().map(String::as_str) else {
1653                        return Err(crate::RedDBError::Query(
1654                            "patch path cannot be empty".to_string(),
1655                        ));
1656                    };
1657
1658                    match root {
1659                        "fields" => {
1660                            if op.path.len() < 2 {
1661                                return Err(crate::RedDBError::Query(
1662                                    "patch path 'fields' requires a nested key".to_string(),
1663                                ));
1664                            }
1665                            op.path.remove(0);
1666                            let Some(target) = op.path.first().map(String::as_str) else {
1667                                return Err(crate::RedDBError::Query(
1668                                    "patch path requires a target under fields".to_string(),
1669                                ));
1670                            };
1671                            if !matches!(target, "dense" | "content" | "sparse") {
1672                                return Err(crate::RedDBError::Query(format!(
1673                                    "unsupported vector patch target '{target}'"
1674                                )));
1675                            }
1676                            field_ops.push(op);
1677                        }
1678                        "metadata" => {
1679                            if op.path.len() < 2 {
1680                                return Err(crate::RedDBError::Query(
1681                                    "patch path 'metadata' requires a nested key".to_string(),
1682                                ));
1683                            }
1684                            op.path.remove(0);
1685                            metadata_ops.push(op);
1686                        }
1687                        _ => {
1688                            return Err(crate::RedDBError::Query(format!(
1689                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1690                            )));
1691                        }
1692                    }
1693                }
1694
1695                if !field_ops.is_empty() {
1696                    context_index_dirty = true;
1697                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1698                }
1699
1700                if let Some(fields) = payload
1701                    .get("fields")
1702                    .and_then(crate::json::Value::as_object)
1703                {
1704                    context_index_dirty = true;
1705                    if let Some(content) =
1706                        fields.get("content").and_then(crate::json::Value::as_str)
1707                    {
1708                        vector.content = Some(content.to_string());
1709                    }
1710                    if let Some(dense) = fields.get("dense") {
1711                        vector.dense = dense
1712                            .as_array()
1713                            .ok_or_else(|| {
1714                                crate::RedDBError::Query(
1715                                    "field 'dense' must be an array".to_string(),
1716                                )
1717                            })?
1718                            .iter()
1719                            .map(|value| {
1720                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1721                                    crate::RedDBError::Query(
1722                                        "field 'dense' must contain only numbers".to_string(),
1723                                    )
1724                                })
1725                            })
1726                            .collect::<Result<Vec<_>, _>>()?;
1727                    }
1728                }
1729
1730                if !metadata_ops.is_empty() {
1731                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1732                    let metadata = patch_metadata.get_or_insert_with(|| {
1733                        store.get_metadata(&collection, id).unwrap_or_default()
1734                    });
1735                    let mut metadata_json = metadata_to_json(metadata);
1736                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1737                        .map_err(crate::RedDBError::Query)?;
1738                    *metadata = metadata_from_json(&metadata_json)?;
1739                    metadata_changed = true;
1740                }
1741            }
1742            crate::storage::EntityData::TimeSeries(_)
1743            | crate::storage::EntityData::QueueMessage(_) => {
1744                return Err(crate::RedDBError::Query(
1745                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1746                        .to_string(),
1747                ));
1748            }
1749        }
1750
1751        if let Some(node_type) = graph_node_type {
1752            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1753                node.node_type = node_type;
1754            }
1755        }
1756        if let Some(weight) = graph_edge_weight {
1757            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1758                edge.weight = (weight * 1000.0) as u32;
1759            }
1760        }
1761
1762        if let Some(metadata) = payload
1763            .get("metadata")
1764            .and_then(crate::json::Value::as_object)
1765        {
1766            let patch_metadata = patch_metadata
1767                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1768            for (key, value) in metadata {
1769                ensure_non_tree_reserved_metadata_key(key)?;
1770                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1771            }
1772            metadata_changed = true;
1773        }
1774
1775        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1776            let patch_metadata = patch_metadata
1777                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1778            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1779                patch_metadata.remove(&key);
1780            } else {
1781                patch_metadata.set(key, value);
1782            }
1783            metadata_changed = true;
1784        }
1785
1786        entity.updated_at = std::time::SystemTime::now()
1787            .duration_since(std::time::UNIX_EPOCH)
1788            .unwrap_or_default()
1789            .as_secs();
1790
1791        modified_columns = dedupe_modified_columns(modified_columns);
1792
1793        Ok(AppliedEntityMutation {
1794            id,
1795            collection,
1796            entity,
1797            metadata: patch_metadata,
1798            modified_columns,
1799            persist_metadata: metadata_changed,
1800            context_index_dirty,
1801            replaced_entity: None,
1802            replaced_entity_previous_xmax: 0,
1803            pre_mutation_fields,
1804        })
1805    }
1806
1807    pub(crate) fn apply_loaded_sql_update_row_core(
1808        &self,
1809        collection: String,
1810        mut entity: crate::storage::UnifiedEntity,
1811        static_field_assignments: &[(String, Value)],
1812        dynamic_field_assignments: Vec<(String, Value)>,
1813        static_metadata_assignments: &[(String, MetadataValue)],
1814        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1815        row_contract_plan: Option<&RowUpdateContractPlan>,
1816        row_modified_columns_template: &[String],
1817        row_touches_unique_columns: bool,
1818    ) -> RedDBResult<AppliedEntityMutation> {
1819        let id = entity.id;
1820        let previous_xmax = entity.xmax;
1821        let db = self.db();
1822        let store = db.store();
1823        let Some(_) = store.get_collection(&collection) else {
1824            return Err(crate::RedDBError::NotFound(format!(
1825                "collection not found: {collection}"
1826            )));
1827        };
1828
1829        let versioned_update_xid = match self.current_xid() {
1830            Some(xid) => Some(xid),
1831            None => {
1832                let snapshot_manager = self.snapshot_manager();
1833                let xid = snapshot_manager.begin();
1834                snapshot_manager.commit(xid);
1835                Some(xid)
1836            }
1837        };
1838        let mut replaced_entity = versioned_update_xid.map(|xid| {
1839            let mut old = entity.clone();
1840            old.set_xmax(xid);
1841            old
1842        });
1843
1844        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1845        let row_contract_timestamps = row_contract_plan
1846            .map(|plan| plan.timestamps_enabled)
1847            .unwrap_or(false);
1848        let mut metadata_changed = false;
1849        let mut modified_columns = row_modified_columns_template.to_vec();
1850        let mut context_index_dirty = !modified_columns.is_empty();
1851
1852        // Snapshot OLD field values BEFORE applying the assignments —
1853        // the secondary-index maintenance hook needs both before/after to
1854        // delete-then-insert under changed indexed columns.
1855        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1856
1857        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1858            return Err(crate::RedDBError::Query(
1859                "SQL row update fast path requires a row entity".to_string(),
1860            ));
1861        };
1862
1863        let _ = row_contract_plan;
1864        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1865        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1866
1867        for (key, value) in static_metadata_assignments
1868            .iter()
1869            .cloned()
1870            .chain(dynamic_metadata_assignments)
1871        {
1872            ensure_non_tree_reserved_metadata_key(&key)?;
1873            patch_metadata
1874                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1875                .set(key, value);
1876            metadata_changed = true;
1877        }
1878
1879        if !modified_columns.is_empty() || row_contract_timestamps {
1880            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1881            if row_contract_timestamps {
1882                context_index_dirty = true;
1883                set_row_field(
1884                    row,
1885                    "updated_at",
1886                    Value::UnsignedInteger(current_unix_ms_u64()),
1887                );
1888                modified_columns.push("updated_at".to_string());
1889            }
1890            if row_touches_unique_columns {
1891                let current_fields = collect_row_fields(row);
1892                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1893            }
1894        }
1895
1896        entity.updated_at = std::time::SystemTime::now()
1897            .duration_since(std::time::UNIX_EPOCH)
1898            .unwrap_or_default()
1899            .as_secs();
1900
1901        if let Some(xid) = versioned_update_xid {
1902            let logical_id = entity.logical_id();
1903            entity.id = store.next_entity_id();
1904            entity.set_logical_id(logical_id);
1905            entity.set_xmin(xid);
1906            entity.set_xmax(0);
1907            if let Some(old) = replaced_entity.as_mut() {
1908                old.set_xmax(xid);
1909            }
1910        }
1911
1912        modified_columns = dedupe_modified_columns(modified_columns);
1913
1914        Ok(AppliedEntityMutation {
1915            id: entity.id,
1916            collection,
1917            entity,
1918            metadata: patch_metadata,
1919            modified_columns,
1920            persist_metadata: metadata_changed,
1921            context_index_dirty,
1922            replaced_entity,
1923            replaced_entity_previous_xmax: previous_xmax,
1924            pre_mutation_fields,
1925        })
1926    }
1927
1928    pub(crate) fn persist_applied_entity_mutations(
1929        &self,
1930        applied: &[AppliedEntityMutation],
1931    ) -> RedDBResult<()> {
1932        if applied.is_empty() {
1933            return Ok(());
1934        }
1935
1936        let store = self.db().store();
1937        let collection = &applied[0].collection;
1938        let Some(manager) = store.get_collection(collection) else {
1939            return Err(crate::RedDBError::NotFound(format!(
1940                "collection not found: {collection}"
1941            )));
1942        };
1943
1944        let mut ordinary = Vec::with_capacity(applied.len());
1945        for item in applied {
1946            if let Some(old_version) = item.replaced_entity.as_ref() {
1947                store
1948                    .install_versioned_table_row_update(
1949                        collection,
1950                        old_version.clone(),
1951                        item.entity.clone(),
1952                        if item.persist_metadata {
1953                            item.metadata.as_ref()
1954                        } else {
1955                            None
1956                        },
1957                    )
1958                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1959                if self.current_xid().is_some() {
1960                    self.record_pending_versioned_update(
1961                        crate::runtime::impl_core::current_connection_id(),
1962                        collection,
1963                        old_version.id,
1964                        item.entity.id,
1965                        old_version.xmax,
1966                        item.replaced_entity_previous_xmax,
1967                    );
1968                }
1969            } else {
1970                ordinary.push(item);
1971            }
1972        }
1973        if ordinary.is_empty() {
1974            return Ok(());
1975        }
1976
1977        manager
1978            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1979                (
1980                    &item.entity,
1981                    item.modified_columns.as_slice(),
1982                    if item.persist_metadata {
1983                        item.metadata.as_ref()
1984                    } else {
1985                        None
1986                    },
1987                )
1988            }))
1989            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1990
1991        // PG-HOT-like fast path: segment in-place is done; when no
1992        // mutation touches a secondary-indexed column AND no metadata
1993        // payload needs to be folded into a B-tree record, skip the
1994        // in-line B-tree upsert. The WAL still records the update
1995        // (durability preserved; recovery replay rebuilds the B-tree),
1996        // and `manager.get()` prefers the live segment over the
1997        // B-tree for reads — so the short-circuit is invisible to
1998        // callers. See `persist_entities_to_pager_wal_only`.
1999        let indexed_cols = self
2000            .index_store_ref()
2001            .indexed_columns_set(collection.as_str());
2002        let all_hot = !indexed_cols.is_empty()
2003            && ordinary.iter().all(|item| {
2004                !item.persist_metadata
2005                    && !item
2006                        .modified_columns
2007                        .iter()
2008                        .any(|c| indexed_cols.contains(c))
2009            })
2010            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2011
2012        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2013        // inner loop hands us `applied` which already owns the post-image
2014        // entity; all we need for the persist path is a read borrow.
2015        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2016            ordinary.iter().map(|item| &item.entity).collect();
2017        let persist_fn = if all_hot {
2018            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2019        } else {
2020            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2021        };
2022        persist_fn(store.as_ref(), collection, &entity_refs)
2023            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2024    }
2025
2026    pub(crate) fn flush_applied_entity_mutation(
2027        &self,
2028        applied: &AppliedEntityMutation,
2029    ) -> RedDBResult<()> {
2030        let store = self.db().store();
2031        if applied.context_index_dirty {
2032            store
2033                .context_index()
2034                .index_entity(&applied.collection, &applied.entity);
2035        }
2036        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2037        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2038        // didn't carry recoverable column names) — there's nothing to
2039        // delete-then-insert in that case.
2040        //
2041        // Also build the CDC damage vector here so downstream consumers
2042        // see which columns changed without re-diffing.
2043        let mut changed_columns: Option<Vec<String>> = None;
2044        if !applied.pre_mutation_fields.is_empty() {
2045            let post = entity_row_fields_snapshot(&applied.entity);
2046            if !post.is_empty() {
2047                let damage = crate::application::entity::row_damage_vector(
2048                    &applied.pre_mutation_fields,
2049                    &post,
2050                );
2051                if !damage.is_empty() {
2052                    changed_columns = Some(
2053                        damage
2054                            .touched_columns()
2055                            .into_iter()
2056                            .map(str::to_string)
2057                            .collect(),
2058                    );
2059                }
2060
2061                // HOT-like fast path (P3.T2/T3): when no modified
2062                // column is covered by a secondary index, skip the
2063                // `index_entity_update` call entirely. The function
2064                // would short-circuit internally, but the call still
2065                // reads the registry lock + walks the damage vector
2066                // — avoiding it saves a few microseconds per UPDATE.
2067                // Page-local replace + t_ctid chain support (true
2068                // HOT) lives in a follow-up storage spec.
2069                let indexed_cols: std::collections::HashSet<String> = self
2070                    .index_store_ref()
2071                    .list_indices(applied.collection.as_str())
2072                    .into_iter()
2073                    .filter_map(|idx| idx.columns.first().cloned())
2074                    .collect();
2075                let modified_cols: std::collections::HashSet<String> = damage
2076                    .touched_columns()
2077                    .into_iter()
2078                    .map(str::to_string)
2079                    .collect();
2080                if let Some(old_version) = applied.replaced_entity.as_ref() {
2081                    let old_index_fields: Vec<(String, Value)> = applied
2082                        .pre_mutation_fields
2083                        .iter()
2084                        .filter(|(col, _)| indexed_cols.contains(col))
2085                        .cloned()
2086                        .collect();
2087                    let new_index_fields: Vec<(String, Value)> = post
2088                        .iter()
2089                        .filter(|(col, _)| indexed_cols.contains(col))
2090                        .cloned()
2091                        .collect();
2092                    if !old_index_fields.is_empty() {
2093                        self.index_store_ref()
2094                            .index_entity_delete(
2095                                &applied.collection,
2096                                old_version.id,
2097                                &old_index_fields,
2098                            )
2099                            .map_err(crate::RedDBError::Internal)?;
2100                    }
2101                    if !new_index_fields.is_empty() {
2102                        self.index_store_ref()
2103                            .index_entity_insert(
2104                                &applied.collection,
2105                                applied.entity.id,
2106                                &new_index_fields,
2107                            )
2108                            .map_err(crate::RedDBError::Internal)?;
2109                    }
2110                } else {
2111                    let decision = crate::storage::engine::hot_update::decide(
2112                        &crate::storage::engine::hot_update::HotUpdateInputs {
2113                            collection: applied.collection.as_str(),
2114                            indexed_columns: &indexed_cols,
2115                            modified_columns: &modified_cols,
2116                            // The storage layer currently handles fit via
2117                            // the segment abstraction; we bypass the
2118                            // page-size check here.
2119                            new_tuple_size: 0,
2120                            page_free_space: usize::MAX,
2121                        },
2122                    );
2123                    if !decision.can_hot {
2124                        self.index_store_ref()
2125                            .index_entity_update(
2126                                &applied.collection,
2127                                applied.id,
2128                                &applied.pre_mutation_fields,
2129                                &post,
2130                            )
2131                            .map_err(crate::RedDBError::Internal)?;
2132                    } else {
2133                        // F-04: `applied.collection` is tenant-supplied;
2134                        // strip CR/LF/control bytes via the LogField
2135                        // escaper (ADR 0010).
2136                        tracing::debug!(
2137                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2138                            "hot_update fast-path: skipped index_entity_update"
2139                        );
2140                    }
2141                }
2142            }
2143        }
2144        self.cdc_emit_prebuilt_with_columns(
2145            crate::replication::cdc::ChangeOperation::Update,
2146            &applied.collection,
2147            &applied.entity,
2148            "entity",
2149            applied.metadata.as_ref(),
2150            true,
2151            changed_columns,
2152        );
2153        Ok(())
2154    }
2155
2156    pub(crate) fn apply_loaded_patch_entity(
2157        &self,
2158        collection: String,
2159        entity: crate::storage::UnifiedEntity,
2160        payload: JsonValue,
2161        operations: Vec<PatchEntityOperation>,
2162    ) -> RedDBResult<CreateEntityOutput> {
2163        let applied =
2164            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2165        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2166        self.flush_applied_entity_mutation(&applied)?;
2167        Ok(CreateEntityOutput {
2168            id: applied.id,
2169            entity: Some(applied.entity),
2170        })
2171    }
2172}
2173
2174fn ensure_non_tree_reserved_metadata_patch_paths(
2175    operations: &[PatchEntityOperation],
2176) -> RedDBResult<()> {
2177    for operation in operations {
2178        let Some(key) = operation.path.first().map(String::as_str) else {
2179            continue;
2180        };
2181        ensure_non_tree_reserved_metadata_key(key)?;
2182    }
2183    Ok(())
2184}
2185
2186fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2187    if key.starts_with(TREE_METADATA_PREFIX) {
2188        return Err(crate::RedDBError::Query(format!(
2189            "metadata key '{}' is reserved for managed trees",
2190            key
2191        )));
2192    }
2193    Ok(())
2194}
2195
2196fn ensure_non_tree_reserved_metadata_entries(
2197    metadata: &[(String, MetadataValue)],
2198) -> RedDBResult<()> {
2199    for (key, _) in metadata {
2200        ensure_non_tree_reserved_metadata_key(key)?;
2201    }
2202    Ok(())
2203}
2204
2205fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2206    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2207        return Err(crate::RedDBError::Query(format!(
2208            "edge label '{}' is reserved for managed trees",
2209            TREE_CHILD_EDGE_LABEL
2210        )));
2211    }
2212    Ok(())
2213}
2214
2215impl RedDBRuntime {
2216    pub(crate) fn create_node_unchecked(
2217        &self,
2218        input: CreateNodeInput,
2219    ) -> RedDBResult<CreateEntityOutput> {
2220        let db = self.db();
2221        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2222        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2223        let mut metadata = input.metadata;
2224        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2225        let mut builder = db.node(&input.collection, &input.label);
2226
2227        if let Some(node_type) = input.node_type {
2228            builder = builder.node_type(node_type);
2229        }
2230
2231        for (key, value) in input.properties {
2232            builder = builder.property(key, value);
2233        }
2234
2235        for (key, value) in metadata {
2236            builder = builder.metadata(key, value);
2237        }
2238
2239        for embedding in input.embeddings {
2240            if let Some(model) = embedding.model {
2241                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2242            } else {
2243                builder = builder.embedding(embedding.name, embedding.vector);
2244            }
2245        }
2246
2247        for link in input.table_links {
2248            builder = builder.link_to_table(link.key, link.table);
2249        }
2250
2251        for link in input.node_links {
2252            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2253        }
2254
2255        let id = builder.save()?;
2256        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2257        // don't see this node until the transaction commits.
2258        self.stamp_xmin_if_in_txn(&input.collection, id);
2259        refresh_context_index(&db, &input.collection, id)?;
2260        self.cdc_emit(
2261            crate::replication::cdc::ChangeOperation::Insert,
2262            &input.collection,
2263            id.raw(),
2264            "graph_node",
2265        );
2266        Ok(CreateEntityOutput {
2267            id,
2268            entity: db.store().get(&input.collection, id),
2269        })
2270    }
2271
2272    pub(crate) fn create_edge_unchecked(
2273        &self,
2274        input: CreateEdgeInput,
2275    ) -> RedDBResult<CreateEntityOutput> {
2276        let db = self.db();
2277        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2278        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2279        let mut metadata = input.metadata;
2280        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2281        let mut builder = db
2282            .edge(&input.collection, &input.label)
2283            .from(input.from)
2284            .to(input.to);
2285
2286        if let Some(weight) = input.weight {
2287            builder = builder.weight(weight);
2288        }
2289
2290        for (key, value) in input.properties {
2291            builder = builder.property(key, value);
2292        }
2293
2294        for (key, value) in metadata {
2295            builder = builder.metadata(key, value);
2296        }
2297
2298        let id = builder.save()?;
2299        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2300        // sessions don't follow it until COMMIT.
2301        self.stamp_xmin_if_in_txn(&input.collection, id);
2302        refresh_context_index(&db, &input.collection, id)?;
2303        self.cdc_emit(
2304            crate::replication::cdc::ChangeOperation::Insert,
2305            &input.collection,
2306            id.raw(),
2307            "graph_edge",
2308        );
2309        Ok(CreateEntityOutput {
2310            id,
2311            entity: db.store().get(&input.collection, id),
2312        })
2313    }
2314}
2315
2316fn create_rows_batch_prevalidated_columnar_with_outputs(
2317    runtime: &RedDBRuntime,
2318    collection: String,
2319    column_names: std::sync::Arc<Vec<String>>,
2320    rows: Vec<Vec<crate::storage::schema::Value>>,
2321) -> RedDBResult<Vec<CreateEntityOutput>> {
2322    use crate::storage::{
2323        unified::{EntityData, EntityKind, RowData},
2324        EntityId, UnifiedEntity,
2325    };
2326    use std::sync::Arc;
2327
2328    if rows.is_empty() {
2329        return Ok(Vec::new());
2330    }
2331    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2332    runtime.check_batch_size(rows.len())?;
2333    runtime.check_db_size()?;
2334
2335    let db = runtime.db();
2336    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2337    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2338
2339    let store = db.store();
2340    let table_arc: Arc<str> = Arc::from(collection.as_str());
2341
2342    let indexed_cols = runtime
2343        .index_store_ref()
2344        .indexed_columns_set(collection.as_str());
2345    let has_secondary_indexes = !indexed_cols.is_empty();
2346    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2347        if has_secondary_indexes {
2348            Vec::with_capacity(rows.len())
2349        } else {
2350            Vec::new()
2351        };
2352
2353    let entities: Vec<UnifiedEntity> = rows
2354        .into_iter()
2355        .map(|values| {
2356            if has_secondary_indexes {
2357                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2358                    Vec::with_capacity(indexed_cols.len());
2359                for (name, val) in column_names.iter().zip(values.iter()) {
2360                    if indexed_cols.contains(name) {
2361                        snap.push((name.clone(), val.clone()));
2362                    }
2363                }
2364                field_snapshots.push(snap);
2365            }
2366            let mut row = RowData::new(values);
2367            row.schema = Some(Arc::clone(&column_names));
2368            UnifiedEntity::new(
2369                EntityId::new(0),
2370                EntityKind::TableRow {
2371                    table: Arc::clone(&table_arc),
2372                    row_id: 0,
2373                },
2374                EntityData::Row(row),
2375            )
2376        })
2377        .collect();
2378
2379    let ids = store
2380        .bulk_insert(&collection, entities)
2381        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2382
2383    if has_secondary_indexes {
2384        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2385            .iter()
2386            .zip(field_snapshots)
2387            .map(|(id, fields)| (*id, fields))
2388            .collect();
2389        runtime
2390            .index_store_ref()
2391            .index_entity_insert_batch(&collection, &index_rows)
2392            .map_err(crate::RedDBError::Internal)?;
2393    }
2394
2395    runtime.invalidate_result_cache();
2396    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2397
2398    Ok(ids
2399        .into_iter()
2400        .map(|id| CreateEntityOutput { id, entity: None })
2401        .collect())
2402}
2403
2404impl RuntimeEntityPort for RedDBRuntime {
2405    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2406        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2407        let db = self.db();
2408        let CreateRowInput {
2409            collection,
2410            fields,
2411            metadata: input_metadata,
2412            node_links,
2413            vector_links,
2414        } = input;
2415        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2416        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2417        let mut metadata = input_metadata;
2418        apply_collection_default_ttl(&db, &collection, &mut metadata);
2419        let fields = contract.normalize_insert_fields(fields)?;
2420        contract.enforce_row_uniqueness(&fields, None)?;
2421        // Route through MutationEngine for unified hot path.
2422        let engine = self.mutation_engine();
2423        let result = engine.apply(
2424            collection.clone(),
2425            vec![crate::runtime::mutation::MutationRow {
2426                fields,
2427                metadata,
2428                node_links,
2429                vector_links,
2430            }],
2431        )?;
2432        let id = result.ids[0];
2433        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2434        // that also takes a write lock on the entity cache. We know
2435        // the collection — hit the manager directly. Cuts
2436        // create_row() p50 roughly in half on the hot path.
2437        Ok(CreateEntityOutput {
2438            id,
2439            entity: db.store().get(&collection, id),
2440        })
2441    }
2442
2443    fn create_rows_batch(
2444        &self,
2445        input: CreateRowsBatchInput,
2446    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2447        if input.rows.is_empty() {
2448            return Ok(Vec::new());
2449        }
2450        self.check_batch_size(input.rows.len())?;
2451        self.check_db_size()?;
2452
2453        let db = self.db();
2454        let collection = input.collection;
2455        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2456        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2457
2458        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2459        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2460        for row in input.rows {
2461            if row.collection != collection {
2462                return Err(crate::RedDBError::Query(format!(
2463                    "batch row collection mismatch: expected '{}', got '{}'",
2464                    collection, row.collection
2465                )));
2466            }
2467
2468            let mut metadata = row.metadata;
2469            apply_collection_default_ttl(&db, &collection, &mut metadata);
2470            let fields = contract.normalize_insert_fields(row.fields)?;
2471            contract.enforce_row_uniqueness(&fields, None)?;
2472            uniqueness_rows.push(fields.clone());
2473            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2474        }
2475
2476        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2477
2478        // Route through MutationEngine: single bulk_insert + one CDC batch
2479        // instead of N separate cdc_emit() calls (each acquires a write lock).
2480        let engine = {
2481            let e = self.mutation_engine();
2482            if input.suppress_events {
2483                e.with_suppress_events()
2484            } else {
2485                e
2486            }
2487        };
2488        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2489            .into_iter()
2490            .map(|(fields, metadata, node_links, vector_links)| {
2491                crate::runtime::mutation::MutationRow {
2492                    fields,
2493                    metadata,
2494                    node_links,
2495                    vector_links,
2496                }
2497            })
2498            .collect();
2499
2500        let result = engine
2501            .apply(collection.clone(), mutation_rows)
2502            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2503
2504        let store = db.store();
2505        Ok(result
2506            .ids
2507            .into_iter()
2508            .map(|id| CreateEntityOutput {
2509                id,
2510                entity: store.get(&collection, id),
2511            })
2512            .collect())
2513    }
2514
2515    fn create_rows_batch_prevalidated_columnar(
2516        &self,
2517        collection: String,
2518        column_names: std::sync::Arc<Vec<String>>,
2519        rows: Vec<Vec<crate::storage::schema::Value>>,
2520    ) -> RedDBResult<usize> {
2521        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2522            .map(|outputs| outputs.len())
2523    }
2524
2525    fn create_rows_batch_columnar(
2526        &self,
2527        collection: String,
2528        column_names: std::sync::Arc<Vec<String>>,
2529        rows: Vec<Vec<crate::storage::schema::Value>>,
2530    ) -> RedDBResult<usize> {
2531        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2532            .map(|outputs| outputs.len())
2533    }
2534
2535    fn create_rows_batch_columnar_with_outputs(
2536        &self,
2537        collection: String,
2538        column_names: std::sync::Arc<Vec<String>>,
2539        rows: Vec<Vec<crate::storage::schema::Value>>,
2540    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2541        if rows.is_empty() {
2542            return Ok(Vec::new());
2543        }
2544        self.check_batch_size(rows.len())?;
2545        self.check_db_size()?;
2546
2547        let db = self.db();
2548        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2549        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2550
2551        // Fast path: when the collection carries no contract (or the
2552        // contract has no declared columns) `normalize_insert_fields`
2553        // is a no-op and `enforce_row_uniqueness` finds no unique
2554        // columns to check. Skip the per-row `(String, Value)` tuple
2555        // materialisation entirely and route through the prevalidated
2556        // columnar kernel — same write, CDC, and index path, just
2557        // without the wasted (String, Value) clones. This is the
2558        // bench `bench_users` shape (no contract declared by the
2559        // adapter's `setup_schema`).
2560        let needs_normalisation = match db.collection_contract(&collection) {
2561            Some(c) => {
2562                c.declared_model == crate::catalog::CollectionModel::Table
2563                    && (!c.declared_columns.is_empty()
2564                        || c.table_def
2565                            .as_ref()
2566                            .map(|t| !t.columns.is_empty())
2567                            .unwrap_or(false))
2568            }
2569            None => false,
2570        };
2571        if !needs_normalisation {
2572            return create_rows_batch_prevalidated_columnar_with_outputs(
2573                self,
2574                collection,
2575                column_names,
2576                rows,
2577            );
2578        }
2579
2580        // Slow path: contract requires per-row normalisation /
2581        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2582        // the columnar layout (this still pays N×ncols `String::clone`
2583        // — but it's deferred out of the wire decoder hot loop and
2584        // happens behind a runtime API, not the per-row decode loop)
2585        // and fall through to the existing `create_rows_batch` path.
2586        let ncols = column_names.len();
2587        let tuple_rows: Vec<CreateRowInput> = rows
2588            .into_iter()
2589            .map(|values| {
2590                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2591                    Vec::with_capacity(ncols);
2592                for (name, value) in column_names.iter().zip(values) {
2593                    fields.push((name.clone(), value));
2594                }
2595                CreateRowInput {
2596                    collection: collection.clone(),
2597                    fields,
2598                    metadata: Vec::new(),
2599                    node_links: Vec::new(),
2600                    vector_links: Vec::new(),
2601                }
2602            })
2603            .collect();
2604        self.create_rows_batch(CreateRowsBatchInput {
2605            collection,
2606            rows: tuple_rows,
2607            suppress_events: false,
2608        })
2609    }
2610
2611    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2612        if input.rows.is_empty() {
2613            return Ok(0);
2614        }
2615        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2616        self.check_batch_size(input.rows.len())?;
2617        self.check_db_size()?;
2618
2619        let db = self.db();
2620        let collection = input.collection;
2621        // Still verify the collection's declared model before we blast
2622        // rows at it — this one-off check is O(1), independent of
2623        // ncols, and catches schema-kind mismatches that the client
2624        // can't always see.
2625        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2626        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2627
2628        // Hoist the per-collection default TTL lookup out of the
2629        // per-row loop — it only depends on the collection, not on
2630        // individual rows, and the old path did one HashMap read per
2631        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2632        // apply to any row whose metadata doesn't already carry a TTL.
2633        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2634
2635        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2636            Vec::with_capacity(input.rows.len());
2637        let mut mutation_rows = mutation_rows;
2638        for row in input.rows {
2639            if row.collection != collection {
2640                return Err(crate::RedDBError::Query(format!(
2641                    "batch row collection mismatch: expected '{}', got '{}'",
2642                    collection, row.collection
2643                )));
2644            }
2645            let mut metadata = row.metadata;
2646            if let Some(ttl) = default_ttl_ms {
2647                if !has_internal_ttl_metadata(&metadata) {
2648                    metadata.push((
2649                        "_ttl_ms".to_string(),
2650                        if ttl <= i64::MAX as u64 {
2651                            MetadataValue::Int(ttl as i64)
2652                        } else {
2653                            MetadataValue::Timestamp(ttl)
2654                        },
2655                    ));
2656                }
2657            }
2658            mutation_rows.push(crate::runtime::mutation::MutationRow {
2659                fields: row.fields,
2660                metadata,
2661                node_links: row.node_links,
2662                vector_links: row.vector_links,
2663            });
2664        }
2665
2666        let engine = self.mutation_engine();
2667        let result = engine
2668            .apply(collection, mutation_rows)
2669            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2670        Ok(result.ids.len())
2671    }
2672
2673    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2674        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2675        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2676            input.properties.iter().map(|(key, _)| key.as_str()),
2677            &format!("node '{}'", input.collection),
2678        )?;
2679        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2680        self.create_node_unchecked(input)
2681    }
2682
2683    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2684        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2685        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2686            input.properties.iter().map(|(key, _)| key.as_str()),
2687            &format!("edge '{}'", input.collection),
2688        )?;
2689        ensure_non_tree_structural_edge_label(&input.label)?;
2690        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2691        self.create_edge_unchecked(input)
2692    }
2693
2694    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2695        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2696        let db = self.db();
2697        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2698        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2699        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2700        let mut metadata = input.metadata;
2701        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2702        let mut builder = db.vector(&input.collection).dense(input.dense);
2703
2704        if let Some(content) = input.content {
2705            builder = builder.content(content);
2706        }
2707
2708        for (key, value) in metadata {
2709            builder = builder.metadata(key, value);
2710        }
2711
2712        if let Some(link_row) = input.link_row {
2713            builder = builder.link_to_table(link_row);
2714        }
2715
2716        if let Some(link_node) = input.link_node {
2717            builder = builder.link_to_node(link_node);
2718        }
2719
2720        let id = builder.save()?;
2721        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2722        // concurrent ANN scans hide it until the transaction commits.
2723        self.stamp_xmin_if_in_txn(&input.collection, id);
2724        refresh_context_index(&db, &input.collection, id)?;
2725        self.cdc_emit(
2726            crate::replication::cdc::ChangeOperation::Insert,
2727            &input.collection,
2728            id.raw(),
2729            "vector",
2730        );
2731        Ok(CreateEntityOutput {
2732            id,
2733            entity: db.store().get(&input.collection, id),
2734        })
2735    }
2736
2737    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2738        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2739        let db = self.db();
2740        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2741        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2742
2743        if let JsonValue::Object(ref map) = input.body {
2744            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2745                map.keys().map(String::as_str),
2746                &format!("document '{}'", input.collection),
2747            )?;
2748        }
2749
2750        // Serialize the full body as Value::Json for the "body" field
2751        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2752            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2753        })?;
2754        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2755            "body".to_string(),
2756            crate::storage::schema::Value::Json(body_bytes),
2757        )];
2758
2759        // Flatten top-level keys from the body into named fields for filtering
2760        if let JsonValue::Object(ref map) = input.body {
2761            for (key, value) in map {
2762                let storage_value = json_to_storage_value(value)?;
2763                fields.push((key.clone(), storage_value));
2764            }
2765        }
2766
2767        let mut metadata = input.metadata;
2768        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2769        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2770            .iter()
2771            .map(|(key, value)| (key.as_str(), value.clone()))
2772            .collect();
2773        let mut builder = db.row(&input.collection, columns);
2774
2775        for (key, value) in metadata {
2776            builder = builder.metadata(key, value);
2777        }
2778
2779        for node in input.node_links {
2780            builder = builder.link_to_node(node);
2781        }
2782
2783        for vector in input.vector_links {
2784            builder = builder.link_to_vector(vector);
2785        }
2786
2787        let id = builder.save()?;
2788        // Phase 1.1 MVCC universal: stamp xmin on the document.
2789        self.stamp_xmin_if_in_txn(&input.collection, id);
2790        refresh_context_index(&db, &input.collection, id)?;
2791        self.cdc_emit(
2792            crate::replication::cdc::ChangeOperation::Insert,
2793            &input.collection,
2794            id.raw(),
2795            "document",
2796        );
2797        Ok(CreateEntityOutput {
2798            id,
2799            entity: db.store().get(&input.collection, id),
2800        })
2801    }
2802
2803    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2804        let db = self.db();
2805        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2806        let declared_model = db
2807            .collection_contract(&input.collection)
2808            .map(|contract| contract.declared_model);
2809        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2810            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2811            self.seal_vault_value(&input.collection, input.value)?
2812        } else {
2813            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2814            input.value
2815        };
2816        let fields = vec![
2817            (
2818                "key".to_string(),
2819                crate::storage::schema::Value::text(input.key),
2820            ),
2821            ("value".to_string(), value),
2822        ];
2823        let collection = input.collection;
2824        let result = self.mutation_engine().apply(
2825            collection.clone(),
2826            vec![crate::runtime::mutation::MutationRow {
2827                fields,
2828                metadata: input.metadata,
2829                node_links: Vec::new(),
2830                vector_links: Vec::new(),
2831            }],
2832        )?;
2833        let id = result.ids[0];
2834        Ok(CreateEntityOutput {
2835            id,
2836            entity: db.store().get(&collection, id),
2837        })
2838    }
2839
2840    fn create_timeseries_point(
2841        &self,
2842        input: CreateTimeSeriesPointInput,
2843    ) -> RedDBResult<CreateEntityOutput> {
2844        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2845        let db = self.db();
2846        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2847        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2848
2849        let mut fields = vec![
2850            (
2851                "metric".to_string(),
2852                crate::storage::schema::Value::text(input.metric),
2853            ),
2854            (
2855                "value".to_string(),
2856                crate::storage::schema::Value::Float(input.value),
2857            ),
2858        ];
2859
2860        if let Some(timestamp_ns) = input.timestamp_ns {
2861            fields.push((
2862                "timestamp".to_string(),
2863                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2864            ));
2865        }
2866
2867        if !input.tags.is_empty() {
2868            let tags_json = JsonValue::Object(
2869                input
2870                    .tags
2871                    .into_iter()
2872                    .map(|(key, value)| (key, JsonValue::String(value)))
2873                    .collect(),
2874            );
2875            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2876                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2877            })?;
2878            fields.push((
2879                "tags".to_string(),
2880                crate::storage::schema::Value::Json(tags_bytes),
2881            ));
2882        }
2883
2884        let collection = input.collection;
2885        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2886        // Phase 1.1 MVCC universal: stamp xmin on the point so
2887        // concurrent range scans hide it until COMMIT.
2888        self.stamp_xmin_if_in_txn(&collection, id);
2889        refresh_context_index(&db, &collection, id)?;
2890
2891        Ok(CreateEntityOutput {
2892            id,
2893            entity: db.store().get(&collection, id),
2894        })
2895    }
2896
2897    fn get_kv(
2898        &self,
2899        collection: &str,
2900        key: &str,
2901    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2902        let db = self.db();
2903        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2904        let store = db.store();
2905        let Some(manager) = store.get_collection(collection) else {
2906            return Ok(None);
2907        };
2908        let entities = manager.query_all(|_| true);
2909        for entity in entities {
2910            if let crate::storage::EntityData::Row(ref row) = entity.data {
2911                if let Some(ref named) = row.named {
2912                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2913                        if &**k == key {
2914                            let value = named
2915                                .get("value")
2916                                .cloned()
2917                                .unwrap_or(crate::storage::schema::Value::Null);
2918                            return Ok(Some((value, entity.id)));
2919                        }
2920                    }
2921                }
2922            }
2923        }
2924        Ok(None)
2925    }
2926
2927    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2928        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2929        let found = self.get_kv(collection, key)?;
2930        if let Some((_, id)) = found {
2931            let db = self.db();
2932            let store = db.store();
2933            let deleted = store
2934                .delete(collection, id)
2935                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2936            if deleted {
2937                store.context_index().remove_entity(id);
2938            }
2939            Ok(deleted)
2940        } else {
2941            Ok(false)
2942        }
2943    }
2944
2945    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2946        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2947        let PatchEntityInput {
2948            collection,
2949            id,
2950            payload,
2951            operations,
2952        } = input;
2953        let db = self.db();
2954        let store = db.store();
2955        let Some(manager) = store.get_collection(&collection) else {
2956            return Err(crate::RedDBError::NotFound(format!(
2957                "collection not found: {collection}"
2958            )));
2959        };
2960        let Some(entity) = manager.get(id) else {
2961            return Err(crate::RedDBError::NotFound(format!(
2962                "entity not found: {}",
2963                id.raw()
2964            )));
2965        };
2966        self.apply_loaded_patch_entity(collection, entity, payload, operations)
2967    }
2968
2969    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2970        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2971        let store = self.db().store();
2972        // Snapshot row fields before delete so we can mirror the removal
2973        // into every secondary index. The fetch is best-effort: if the
2974        // entity is already gone, the delete below is a no-op anyway.
2975        let pre_delete_fields = store
2976            .get(&input.collection, input.id)
2977            .as_ref()
2978            .map(entity_row_fields_snapshot)
2979            .unwrap_or_default();
2980        // Store delete first (source of truth). Crash between here and index removal
2981        // leaves the entity invisible to most queries but recoverable; the reverse
2982        // (remove index first, then crash) leaves the entity permanently orphaned.
2983        let deleted = store
2984            .delete(&input.collection, input.id)
2985            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2986        if deleted {
2987            store.context_index().remove_entity(input.id);
2988            // Secondary index maintenance — surface only registry-shape
2989            // errors; missing-index removals are tolerated inside the call.
2990            if !pre_delete_fields.is_empty() {
2991                self.index_store_ref()
2992                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2993                    .map_err(crate::RedDBError::Internal)?;
2994            }
2995            self.cdc_emit(
2996                crate::replication::cdc::ChangeOperation::Delete,
2997                &input.collection,
2998                input.id.raw(),
2999                "entity",
3000            );
3001        }
3002        Ok(DeleteEntityOutput {
3003            deleted,
3004            id: input.id,
3005        })
3006    }
3007}