Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        // A collection that declares an EMBED policy (#1271/#1272) — or a
94        // VISION policy with an image-embedding output (#1275) — holds its
95        // rows' enrichment vectors in place: the CDC enrichment consumer
96        // attaches vectors into the same collection so `VECTOR SEARCH` over
97        // it surfaces the enriched rows. Permit vector writes on such a
98        // collection even when it is otherwise a strict table.
99        if requested_model == crate::catalog::CollectionModel::Vector
100            && contract
101                .ai_policy
102                .as_ref()
103                .is_some_and(|policy| policy.embed.is_some() || policy.vision.is_some())
104        {
105            return Ok(());
106        }
107        return Err(crate::RedDBError::InvalidOperation(format!(
108            "collection '{}' is declared as '{}' and does not allow '{}' writes",
109            collection,
110            collection_model_name(contract.declared_model),
111            collection_model_name(requested_model)
112        )));
113    }
114
115    let now = implicit_contract_unix_ms();
116    db.save_collection_contract(crate::physical::CollectionContract {
117        name: collection.to_string(),
118        declared_model: requested_model,
119        schema_mode: crate::catalog::SchemaMode::Dynamic,
120        origin: crate::physical::ContractOrigin::Implicit,
121        version: 1,
122        created_at_unix_ms: now,
123        updated_at_unix_ms: now,
124        default_ttl_ms: db.collection_default_ttl_ms(collection),
125        vector_dimension: None,
126        vector_metric: None,
127        context_index_fields: Vec::new(),
128        declared_columns: Vec::new(),
129        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
130            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
131        timestamps_enabled: false,
132        context_index_enabled: false,
133        metrics_raw_retention_ms: None,
134        metrics_rollup_policies: Vec::new(),
135        metrics_tenant_identity: None,
136        metrics_namespace: None,
137        // Implicit contracts are created on first write — mutability
138        // is the default until the operator runs explicit DDL.
139        append_only: false,
140        subscriptions: Vec::new(),
141        analytics_config: Vec::new(),
142        session_key: None,
143        session_gap_ms: None,
144        retention_duration_ms: None,
145        analytical_storage: None,
146
147        ai_policy: None,
148    })
149    .map(|_| ())
150    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
151}
152
153fn implicit_contract_unix_ms() -> u128 {
154    std::time::SystemTime::now()
155        .duration_since(std::time::UNIX_EPOCH)
156        .unwrap_or_default()
157        .as_millis()
158}
159
160fn collection_model_allows(
161    declared_model: crate::catalog::CollectionModel,
162    requested_model: crate::catalog::CollectionModel,
163) -> bool {
164    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
165}
166
167fn ensure_vector_dimension_contract(
168    db: &crate::storage::unified::devx::RedDB,
169    collection: &str,
170    actual_dimension: usize,
171) -> RedDBResult<()> {
172    let Some(expected_dimension) = db
173        .collection_contract(collection)
174        .and_then(|contract| contract.vector_dimension)
175    else {
176        return Ok(());
177    };
178    if expected_dimension == actual_dimension {
179        return Ok(());
180    }
181    Err(crate::RedDBError::Query(format!(
182        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
183    )))
184}
185
186fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
187    match model {
188        crate::catalog::CollectionModel::Table => "table",
189        crate::catalog::CollectionModel::Document => "document",
190        crate::catalog::CollectionModel::Graph => "graph",
191        crate::catalog::CollectionModel::Vector => "vector",
192        crate::catalog::CollectionModel::Hll => "hll",
193        crate::catalog::CollectionModel::Sketch => "sketch",
194        crate::catalog::CollectionModel::Filter => "filter",
195        crate::catalog::CollectionModel::Kv => "kv",
196        crate::catalog::CollectionModel::Config => "config",
197        crate::catalog::CollectionModel::Vault => "vault",
198        crate::catalog::CollectionModel::Mixed => "mixed",
199        crate::catalog::CollectionModel::TimeSeries => "timeseries",
200        crate::catalog::CollectionModel::Queue => "queue",
201        crate::catalog::CollectionModel::Metrics => "metrics",
202    }
203}
204
205#[derive(Clone)]
206struct UniquenessRule {
207    name: String,
208    columns: Vec<String>,
209    primary_key: bool,
210}
211
212#[derive(Copy, Clone, PartialEq, Eq)]
213enum NormalizeMode {
214    /// First write for this row. Timestamps auto-filled from now on
215    /// both `created_at` and `updated_at`; user attempts to set
216    /// either column are rejected.
217    Insert,
218    /// Update/patch path. `created_at` is preserved from the existing
219    /// row (immutable after insert); `updated_at` is bumped to now.
220    /// User attempts to set either via the patch are rejected.
221    Update,
222}
223
224mod collection_contract_enforcement {
225    use super::*;
226
227    pub(super) struct CollectionContractWriteEnforcer<'a> {
228        db: &'a crate::storage::unified::devx::RedDB,
229        collection: &'a str,
230    }
231
232    impl<'a> CollectionContractWriteEnforcer<'a> {
233        pub(super) fn new(
234            db: &'a crate::storage::unified::devx::RedDB,
235            collection: &'a str,
236        ) -> Self {
237            Self { db, collection }
238        }
239
240        pub(super) fn ensure_model(
241            &self,
242            requested_model: crate::catalog::CollectionModel,
243        ) -> RedDBResult<()> {
244            ensure_collection_model_contract(self.db, self.collection, requested_model)
245        }
246
247        pub(super) fn normalize_insert_fields(
248            &self,
249            fields: Vec<(String, Value)>,
250        ) -> RedDBResult<Vec<(String, Value)>> {
251            normalize_row_fields_for_contract_with_mode(
252                self.db,
253                self.collection,
254                fields,
255                NormalizeMode::Insert,
256            )
257        }
258
259        pub(super) fn normalize_update_fields(
260            &self,
261            fields: Vec<(String, Value)>,
262        ) -> RedDBResult<Vec<(String, Value)>> {
263            normalize_row_fields_for_contract_with_mode(
264                self.db,
265                self.collection,
266                fields,
267                NormalizeMode::Update,
268            )
269        }
270
271        pub(super) fn enforce_row_uniqueness(
272            &self,
273            fields: &[(String, Value)],
274            exclude_id: Option<crate::storage::EntityId>,
275        ) -> RedDBResult<()> {
276            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
277        }
278
279        pub(super) fn enforce_batch_uniqueness(
280            &self,
281            rows: &[Vec<(String, Value)>],
282        ) -> RedDBResult<()> {
283            enforce_row_batch_uniqueness(self.db, self.collection, rows)
284        }
285
286        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
287            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
288        }
289    }
290}
291
292use collection_contract_enforcement::CollectionContractWriteEnforcer;
293
294fn normalize_row_fields_for_contract_with_mode(
295    db: &crate::storage::unified::devx::RedDB,
296    collection: &str,
297    fields: Vec<(String, Value)>,
298    mode: NormalizeMode,
299) -> RedDBResult<Vec<(String, Value)>> {
300    let Some(contract) = db.collection_contract(collection) else {
301        return Ok(fields);
302    };
303
304    if contract.declared_model != crate::catalog::CollectionModel::Table
305        || (contract.declared_columns.is_empty()
306            && contract
307                .table_def
308                .as_ref()
309                .map(|table| table.columns.is_empty())
310                .unwrap_or(true))
311    {
312        return Ok(fields);
313    }
314
315    // Capture the pre-normalize value of created_at (if present) so
316    // Update mode can preserve it. Also capture updated_at to detect
317    // user attempts to set it via the patch payload.
318    //
319    // Heuristic for Update mode: if fields ALREADY contains a
320    // `created_at` whose value matches the row's on-disk entity, the
321    // caller is the patch pipeline carrying forward an auto-populated
322    // column — not a user mutation. Allow pass-through in that case,
323    // then restore the original value at the end.
324    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
325        fields
326            .iter()
327            .find(|(n, _)| n == "created_at")
328            .map(|(_, v)| v.clone())
329    } else {
330        None
331    };
332
333    // Reject user attempts to set runtime-managed timestamp columns.
334    // On Insert we reject any mention; on Update we only reject when
335    // the patch pipeline handed us a NEW value (not the one we
336    // auto-populated during the last insert).
337    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
338        for (name, _) in &fields {
339            if name == "created_at" || name == "updated_at" {
340                return Err(crate::RedDBError::Query(format!(
341                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
342                    collection, name
343                )));
344            }
345        }
346    }
347
348    let mut provided = std::collections::BTreeMap::new();
349    for (name, value) in &fields {
350        provided.insert(name.clone(), value.clone());
351    }
352
353    let resolved_columns = resolved_contract_columns(&contract)?;
354    let declared_names: std::collections::BTreeSet<String> = resolved_columns
355        .iter()
356        .map(|column| column.name.clone())
357        .collect();
358    let unknown_fields: Vec<String> = fields
359        .iter()
360        .filter(|(name, _)| !declared_names.contains(name))
361        .map(|(name, _)| name.clone())
362        .collect();
363    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
364        && !unknown_fields.is_empty()
365    {
366        return Err(crate::RedDBError::Query(format!(
367            "collection '{}' is strict and does not allow undeclared fields: {}",
368            collection,
369            unknown_fields.join(", ")
370        )));
371    }
372    let mut normalized = Vec::new();
373    let now_ms = current_unix_ms_u64();
374
375    for column in &resolved_columns {
376        match provided.remove(&column.name) {
377            Some(value) => {
378                // Runtime-managed columns on Update: always overwrite
379                // with the runtime's own value (preserved created_at
380                // or fresh updated_at). User mutations are silently
381                // discarded because we reject them earlier.
382                if contract.timestamps_enabled && mode == NormalizeMode::Update {
383                    match column.name.as_str() {
384                        "created_at" => {
385                            normalized.push((
386                                column.name.clone(),
387                                existing_created_at
388                                    .clone()
389                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
390                            ));
391                            continue;
392                        }
393                        "updated_at" => {
394                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395                            continue;
396                        }
397                        _ => {}
398                    }
399                }
400                normalized.push((
401                    column.name.clone(),
402                    normalize_contract_value(collection, column, value)?,
403                ));
404            }
405            None => {
406                // Runtime-managed timestamp columns: auto-fill with now
407                // when the contract opted in. Both get the same value on
408                // first insert so callers can order by either.
409                if contract.timestamps_enabled
410                    && (column.name == "created_at" || column.name == "updated_at")
411                {
412                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
413                    continue;
414                }
415                if let Some(default) = &column.default {
416                    normalized.push((
417                        column.name.clone(),
418                        coerce_contract_literal(collection, &column.name, column, default)?,
419                    ));
420                } else if column.not_null {
421                    return Err(crate::RedDBError::Query(format!(
422                        "missing required column '{}' for collection '{}'",
423                        column.name, collection
424                    )));
425                }
426            }
427        }
428    }
429
430    for (name, value) in fields {
431        if !declared_names.contains(&name) {
432            normalized.push((name, value));
433        }
434    }
435
436    Ok(normalized)
437}
438
439fn current_unix_ms_u64() -> u64 {
440    std::time::SystemTime::now()
441        .duration_since(std::time::UNIX_EPOCH)
442        .map(|d| d.as_millis() as u64)
443        .unwrap_or(0)
444}
445
446fn enforce_row_uniqueness(
447    db: &crate::storage::unified::devx::RedDB,
448    collection: &str,
449    fields: &[(String, Value)],
450    exclude_id: Option<crate::storage::EntityId>,
451) -> RedDBResult<()> {
452    let Some(contract) = db.collection_contract(collection) else {
453        return Ok(());
454    };
455    if contract.declared_model != crate::catalog::CollectionModel::Table
456        && contract.declared_model != crate::catalog::CollectionModel::Mixed
457    {
458        return Ok(());
459    }
460
461    let rules = resolved_uniqueness_rules(&contract);
462    if rules.is_empty() {
463        return Ok(());
464    }
465
466    let store = db.store();
467    let Some(manager) = store.get_collection(collection) else {
468        return Ok(());
469    };
470
471    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
472
473    for rule in &rules {
474        let mut expected_signatures = Vec::new();
475        let mut skip_rule = false;
476
477        for column in &rule.columns {
478            match input_fields.get(column) {
479                Some(Value::Null) | None if rule.primary_key => {
480                    return Err(crate::RedDBError::Query(format!(
481                        "primary key '{}' in collection '{}' requires non-null column '{}'",
482                        rule.name, collection, column
483                    )))
484                }
485                Some(Value::Null) | None => {
486                    skip_rule = true;
487                    break;
488                }
489                Some(value) => {
490                    expected_signatures.push((column.clone(), value_signature(value)));
491                }
492            }
493        }
494
495        if skip_rule {
496            continue;
497        }
498
499        for entity in manager.query_all(|_| true) {
500            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
501                continue;
502            }
503            let Some(existing_fields) = row_fields_from_entity(&entity) else {
504                continue;
505            };
506
507            let duplicate = expected_signatures.iter().all(|(column, expected)| {
508                existing_fields
509                    .get(column)
510                    .map(|value| value_signature(value) == *expected)
511                    .unwrap_or(false)
512            });
513
514            if duplicate {
515                let qualifier = if rule.primary_key {
516                    "primary key"
517                } else {
518                    "unique constraint"
519                };
520                return Err(crate::RedDBError::Query(format!(
521                    "{} '{}' violated on collection '{}' for columns [{}]",
522                    qualifier,
523                    rule.name,
524                    collection,
525                    rule.columns.join(", ")
526                )));
527            }
528        }
529    }
530
531    Ok(())
532}
533
534fn enforce_row_batch_uniqueness(
535    db: &crate::storage::unified::devx::RedDB,
536    collection: &str,
537    rows: &[Vec<(String, Value)>],
538) -> RedDBResult<()> {
539    let Some(contract) = db.collection_contract(collection) else {
540        return Ok(());
541    };
542    if contract.declared_model != crate::catalog::CollectionModel::Table
543        && contract.declared_model != crate::catalog::CollectionModel::Mixed
544    {
545        return Ok(());
546    }
547
548    let rules = resolved_uniqueness_rules(&contract);
549    if rules.is_empty() {
550        return Ok(());
551    }
552
553    for rule in &rules {
554        let mut seen = std::collections::HashMap::<String, usize>::new();
555        for (row_index, fields) in rows.iter().enumerate() {
556            let input_fields: std::collections::BTreeMap<String, Value> =
557                fields.iter().cloned().collect();
558            let mut signatures = Vec::new();
559            let mut skip_rule = false;
560
561            for column in &rule.columns {
562                match input_fields.get(column) {
563                    Some(Value::Null) | None if rule.primary_key => {
564                        return Err(crate::RedDBError::Query(format!(
565                            "primary key '{}' in collection '{}' requires non-null column '{}'",
566                            rule.name, collection, column
567                        )))
568                    }
569                    Some(Value::Null) | None => {
570                        skip_rule = true;
571                        break;
572                    }
573                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
574                }
575            }
576
577            if skip_rule {
578                continue;
579            }
580
581            let signature = signatures.join("|");
582            if let Some(previous_index) = seen.insert(signature, row_index) {
583                return Err(crate::RedDBError::Query(format!(
584                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
585                    rule.name,
586                    collection,
587                    previous_index + 1,
588                    row_index + 1
589                )));
590            }
591        }
592    }
593
594    Ok(())
595}
596
597fn row_update_requires_uniqueness_check(
598    db: &crate::storage::unified::devx::RedDB,
599    collection: &str,
600    modified_columns: &[String],
601) -> bool {
602    if modified_columns.is_empty() {
603        return false;
604    }
605
606    let Some(contract) = db.collection_contract(collection) else {
607        return false;
608    };
609    if contract.declared_model != crate::catalog::CollectionModel::Table
610        && contract.declared_model != crate::catalog::CollectionModel::Mixed
611    {
612        return false;
613    }
614
615    let rules = resolved_uniqueness_rules(&contract);
616    if rules.is_empty() {
617        return false;
618    }
619
620    rules.iter().any(|rule| {
621        rule.columns.iter().any(|column| {
622            modified_columns
623                .iter()
624                .any(|modified| modified.eq_ignore_ascii_case(column))
625        })
626    })
627}
628
629pub(crate) fn build_row_update_contract_plan(
630    db: &crate::storage::unified::devx::RedDB,
631    collection: &str,
632) -> RedDBResult<Option<RowUpdateContractPlan>> {
633    let Some(contract) = db.collection_contract(collection) else {
634        return Ok(None);
635    };
636
637    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
638        && !(contract.declared_columns.is_empty()
639            && contract
640                .table_def
641                .as_ref()
642                .map(|table| table.columns.is_empty())
643                .unwrap_or(true))
644    {
645        resolved_contract_columns(&contract)?
646            .into_iter()
647            .map(|rule| {
648                (
649                    rule.name.clone(),
650                    RowUpdateColumnRule {
651                        name: rule.name,
652                        data_type: rule.data_type,
653                        data_type_name: rule.data_type_name,
654                        not_null: rule.not_null,
655                        enum_variants: rule.enum_variants,
656                    },
657                )
658            })
659            .collect()
660    } else {
661        HashMap::new()
662    };
663
664    let unique_columns = if matches!(
665        contract.declared_model,
666        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
667    ) {
668        resolved_uniqueness_rules(&contract)
669            .into_iter()
670            .flat_map(|rule| rule.columns.into_iter())
671            .map(|column| (column, ()))
672            .collect()
673    } else {
674        HashMap::new()
675    };
676
677    Ok(Some(RowUpdateContractPlan {
678        timestamps_enabled: contract.timestamps_enabled,
679        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
680        declared_rules,
681        unique_columns,
682    }))
683}
684
685pub(crate) fn normalize_row_update_assignment_with_plan(
686    collection: &str,
687    column: &str,
688    value: Value,
689    row_contract_plan: Option<&RowUpdateContractPlan>,
690) -> RedDBResult<Value> {
691    let Some(plan) = row_contract_plan else {
692        return Ok(value);
693    };
694
695    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
696        return Err(crate::RedDBError::Query(format!(
697            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
698            collection, column
699        )));
700    }
701
702    if let Some(rule) = plan.declared_rules.get(column) {
703        let rule = ResolvedColumnRule {
704            name: rule.name.clone(),
705            data_type: rule.data_type,
706            data_type_name: rule.data_type_name.clone(),
707            not_null: rule.not_null,
708            default: None,
709            enum_variants: rule.enum_variants.clone(),
710        };
711        normalize_contract_value(collection, &rule, value)
712    } else if plan.strict_schema {
713        Err(crate::RedDBError::Query(format!(
714            "collection '{}' is strict and does not allow undeclared fields: {}",
715            collection, column
716        )))
717    } else {
718        Ok(value)
719    }
720}
721
722pub(crate) fn normalize_row_update_value_for_rule(
723    collection: &str,
724    value: Value,
725    row_rule: Option<&RowUpdateColumnRule>,
726) -> RedDBResult<Value> {
727    let Some(rule) = row_rule else {
728        return Ok(value);
729    };
730
731    let rule = ResolvedColumnRule {
732        name: rule.name.clone(),
733        data_type: rule.data_type,
734        data_type_name: rule.data_type_name.clone(),
735        not_null: rule.not_null,
736        default: None,
737        enum_variants: rule.enum_variants.clone(),
738    };
739    normalize_contract_value(collection, &rule, value)
740}
741
742fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
743    if let Some(named) = row.named.as_mut() {
744        named.insert(name.to_string(), value);
745        return;
746    }
747
748    if let Some(schema) = row.schema.as_ref() {
749        if let Some(index) = schema.iter().position(|column| column == name) {
750            if let Some(slot) = row.columns.get_mut(index) {
751                *slot = value;
752                return;
753            }
754        }
755
756        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
757        for (column, current) in schema.iter().zip(row.columns.iter()) {
758            named.insert(column.clone(), current.clone());
759        }
760        named.insert(name.to_string(), value);
761        row.named = Some(named);
762        return;
763    }
764
765    let mut named = HashMap::with_capacity(1);
766    named.insert(name.to_string(), value);
767    row.named = Some(named);
768}
769
770fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
771    if let Some(named) = row.named.as_ref() {
772        named
773            .iter()
774            .map(|(key, value)| (key.clone(), value.clone()))
775            .collect()
776    } else if let Some(schema) = row.schema.as_ref() {
777        schema
778            .iter()
779            .cloned()
780            .zip(row.columns.iter().cloned())
781            .collect()
782    } else {
783        Vec::new()
784    }
785}
786
787fn apply_row_field_assignments_raw<I>(
788    row: &mut crate::storage::unified::entity::RowData,
789    field_assignments: I,
790) where
791    I: IntoIterator<Item = (String, Value)>,
792{
793    for (column, value) in field_assignments {
794        set_row_field(row, &column, value);
795    }
796}
797
798fn apply_row_field_assignments_incremental<I>(
799    collection: &str,
800    row: &mut crate::storage::unified::entity::RowData,
801    field_assignments: I,
802    row_contract_plan: Option<&RowUpdateContractPlan>,
803) -> RedDBResult<()>
804where
805    I: IntoIterator<Item = (String, Value)>,
806{
807    for (column, value) in field_assignments {
808        let value = normalize_row_update_assignment_with_plan(
809            collection,
810            &column,
811            value,
812            row_contract_plan,
813        )?;
814
815        set_row_field(row, &column, value);
816    }
817
818    Ok(())
819}
820
821fn resolved_uniqueness_rules(
822    contract: &crate::physical::CollectionContract,
823) -> Vec<UniquenessRule> {
824    let mut rules = Vec::new();
825
826    if let Some(table_def) = &contract.table_def {
827        if !table_def.primary_key.is_empty() {
828            rules.push(UniquenessRule {
829                name: "primary_key".to_string(),
830                columns: table_def.primary_key.clone(),
831                primary_key: true,
832            });
833        }
834
835        for constraint in &table_def.constraints {
836            if matches!(
837                constraint.constraint_type,
838                crate::storage::schema::ConstraintType::PrimaryKey
839            ) && !constraint.columns.is_empty()
840            {
841                rules.push(UniquenessRule {
842                    name: constraint.name.clone(),
843                    columns: constraint.columns.clone(),
844                    primary_key: true,
845                });
846            } else if matches!(
847                constraint.constraint_type,
848                crate::storage::schema::ConstraintType::Unique
849            ) && !constraint.columns.is_empty()
850            {
851                rules.push(UniquenessRule {
852                    name: constraint.name.clone(),
853                    columns: constraint.columns.clone(),
854                    primary_key: false,
855                });
856            }
857        }
858    } else {
859        for column in &contract.declared_columns {
860            if column.primary_key {
861                rules.push(UniquenessRule {
862                    name: format!("pk_{}", column.name),
863                    columns: vec![column.name.clone()],
864                    primary_key: true,
865                });
866            } else if column.unique {
867                rules.push(UniquenessRule {
868                    name: format!("uniq_{}", column.name),
869                    columns: vec![column.name.clone()],
870                    primary_key: false,
871                });
872            }
873        }
874    }
875
876    let mut dedup = std::collections::BTreeSet::new();
877    rules
878        .into_iter()
879        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
880        .collect()
881}
882
883fn row_fields_from_entity(
884    entity: &crate::storage::UnifiedEntity,
885) -> Option<std::collections::BTreeMap<String, Value>> {
886    match &entity.data {
887        crate::storage::EntityData::Row(row) => {
888            if let Some(named) = &row.named {
889                Some(
890                    named
891                        .iter()
892                        .map(|(key, value)| (key.clone(), value.clone()))
893                        .collect(),
894                )
895            } else {
896                row.schema.as_ref().map(|schema| {
897                    schema
898                        .iter()
899                        .cloned()
900                        .zip(row.columns.iter().cloned())
901                        .collect()
902                })
903            }
904        }
905        _ => None,
906    }
907}
908
909fn value_signature(value: &Value) -> String {
910    format!("{value:?}")
911}
912
913fn normalize_contract_value(
914    collection: &str,
915    column: &ResolvedColumnRule,
916    value: Value,
917) -> RedDBResult<Value> {
918    if matches!(value, Value::Null) {
919        if column.not_null {
920            return Err(crate::RedDBError::Query(format!(
921                "column '{}' in collection '{}' cannot be null",
922                column.name, collection
923            )));
924        }
925        return Ok(Value::Null);
926    }
927
928    let target = column.data_type;
929    if value_matches_declared_type(&value, target) {
930        return Ok(value);
931    }
932
933    let Some(raw) = value_to_coercion_input(&value) else {
934        return Err(crate::RedDBError::Query(format!(
935            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
936            column.name, collection, column.data_type_name
937        )));
938    };
939
940    coerce_contract_literal(collection, &column.name, column, &raw)
941}
942
943fn coerce_contract_literal(
944    collection: &str,
945    column_name: &str,
946    column: &ResolvedColumnRule,
947    raw: &str,
948) -> RedDBResult<Value> {
949    let target = column.data_type;
950    match target {
951        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
952        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
953        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
954            crate::RedDBError::Query(format!(
955                "failed to coerce column '{}' in collection '{}' to '{}': {}",
956                column_name, collection, column.data_type_name, err
957            ))
958        }),
959        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
960            crate::RedDBError::Query(format!(
961                "failed to coerce column '{}' in collection '{}' to '{}': {}",
962                column_name, collection, column.data_type_name, err
963            ))
964        }),
965        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
966            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
967            column_name, collection, column.data_type_name
968        ))),
969        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
970            |err| {
971                crate::RedDBError::Query(format!(
972                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
973                    column_name, collection, column.data_type_name, err
974                ))
975            },
976        ),
977    }
978}
979
980struct ResolvedColumnRule {
981    name: String,
982    data_type: DataType,
983    data_type_name: String,
984    not_null: bool,
985    default: Option<String>,
986    enum_variants: Vec<String>,
987}
988
989fn resolved_contract_columns(
990    contract: &crate::physical::CollectionContract,
991) -> RedDBResult<Vec<ResolvedColumnRule>> {
992    if let Some(table_def) = &contract.table_def {
993        return Ok(table_def
994            .columns
995            .iter()
996            .map(|column| ResolvedColumnRule {
997                name: column.name.clone(),
998                data_type: column.data_type,
999                data_type_name: data_type_name(column.data_type).to_string(),
1000                not_null: !column.nullable,
1001                default: column
1002                    .default
1003                    .as_ref()
1004                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
1005                enum_variants: column.enum_variants.clone(),
1006            })
1007            .collect());
1008    }
1009
1010    contract
1011        .declared_columns
1012        .iter()
1013        .map(|column| {
1014            let data_type = column
1015                .sql_type
1016                .as_ref()
1017                .map(crate::storage::query::resolve_sql_type_name)
1018                .transpose()
1019                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1020                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1021            Ok(ResolvedColumnRule {
1022                name: column.name.clone(),
1023                data_type,
1024                data_type_name: column.data_type.clone(),
1025                not_null: column.not_null,
1026                default: column.default.clone(),
1027                enum_variants: column.enum_variants.clone(),
1028            })
1029        })
1030        .collect()
1031}
1032
1033fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1034    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1035}
1036
1037fn data_type_name(data_type: DataType) -> &'static str {
1038    match data_type {
1039        DataType::Integer => "integer",
1040        DataType::UnsignedInteger => "unsigned_integer",
1041        DataType::Float => "float",
1042        DataType::Text => "text",
1043        DataType::Blob => "blob",
1044        DataType::Boolean => "boolean",
1045        DataType::Timestamp => "timestamp",
1046        DataType::Duration => "duration",
1047        DataType::IpAddr => "ipaddr",
1048        DataType::MacAddr => "macaddr",
1049        DataType::Vector => "vector",
1050        DataType::Nullable => "nullable",
1051        DataType::Unknown => "unknown",
1052        DataType::Json => "json",
1053        DataType::Uuid => "uuid",
1054        DataType::NodeRef => "noderef",
1055        DataType::EdgeRef => "edgeref",
1056        DataType::VectorRef => "vectorref",
1057        DataType::RowRef => "rowref",
1058        DataType::Color => "color",
1059        DataType::Email => "email",
1060        DataType::Url => "url",
1061        DataType::Phone => "phone",
1062        DataType::Semver => "semver",
1063        DataType::Cidr => "cidr",
1064        DataType::Date => "date",
1065        DataType::Time => "time",
1066        DataType::Decimal => "decimal",
1067        DataType::Enum => "enum",
1068        DataType::Array => "array",
1069        DataType::TimestampMs => "timestamp_ms",
1070        DataType::Ipv4 => "ipv4",
1071        DataType::Ipv6 => "ipv6",
1072        DataType::Subnet => "subnet",
1073        DataType::Port => "port",
1074        DataType::Latitude => "latitude",
1075        DataType::Longitude => "longitude",
1076        DataType::GeoPoint => "geopoint",
1077        DataType::Country2 => "country2",
1078        DataType::Country3 => "country3",
1079        DataType::Lang2 => "lang2",
1080        DataType::Lang5 => "lang5",
1081        DataType::Currency => "currency",
1082        DataType::AssetCode => "asset_code",
1083        DataType::Money => "money",
1084        DataType::ColorAlpha => "color_alpha",
1085        DataType::BigInt => "bigint",
1086        DataType::KeyRef => "keyref",
1087        DataType::DocRef => "docref",
1088        DataType::TableRef => "tableref",
1089        DataType::PageRef => "pageref",
1090        DataType::Secret => "secret",
1091        DataType::Password => "password",
1092        DataType::TextZstd => "text",
1093        DataType::BlobZstd => "blob",
1094    }
1095}
1096
1097fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1098    matches!(
1099        (value, target),
1100        (Value::Null, _)
1101            | (Value::Integer(_), DataType::Integer)
1102            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1103            | (Value::Float(_), DataType::Float)
1104            | (Value::Text(_), DataType::Text)
1105            | (Value::Blob(_), DataType::Blob)
1106            | (Value::Boolean(_), DataType::Boolean)
1107            | (Value::Timestamp(_), DataType::Timestamp)
1108            | (Value::Duration(_), DataType::Duration)
1109            | (Value::IpAddr(_), DataType::IpAddr)
1110            | (Value::MacAddr(_), DataType::MacAddr)
1111            | (Value::Vector(_), DataType::Vector)
1112            | (Value::Json(_), DataType::Json)
1113            | (Value::Uuid(_), DataType::Uuid)
1114            | (Value::NodeRef(_), DataType::NodeRef)
1115            | (Value::EdgeRef(_), DataType::EdgeRef)
1116            | (Value::VectorRef(_, _), DataType::VectorRef)
1117            | (Value::RowRef(_, _), DataType::RowRef)
1118            | (Value::Color(_), DataType::Color)
1119            | (Value::Email(_), DataType::Email)
1120            | (Value::Url(_), DataType::Url)
1121            | (Value::Phone(_), DataType::Phone)
1122            | (Value::Semver(_), DataType::Semver)
1123            | (Value::Cidr(_, _), DataType::Cidr)
1124            | (Value::Date(_), DataType::Date)
1125            | (Value::Time(_), DataType::Time)
1126            | (Value::Decimal(_), DataType::Decimal)
1127            | (Value::EnumValue(_), DataType::Enum)
1128            | (Value::Array(_), DataType::Array)
1129            | (Value::TimestampMs(_), DataType::TimestampMs)
1130            | (Value::Ipv4(_), DataType::Ipv4)
1131            | (Value::Ipv6(_), DataType::Ipv6)
1132            | (Value::Subnet(_, _), DataType::Subnet)
1133            | (Value::Port(_), DataType::Port)
1134            | (Value::Latitude(_), DataType::Latitude)
1135            | (Value::Longitude(_), DataType::Longitude)
1136            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1137            | (Value::Country2(_), DataType::Country2)
1138            | (Value::Country3(_), DataType::Country3)
1139            | (Value::Lang2(_), DataType::Lang2)
1140            | (Value::Lang5(_), DataType::Lang5)
1141            | (Value::Currency(_), DataType::Currency)
1142            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1143            | (Value::BigInt(_), DataType::BigInt)
1144            | (Value::KeyRef(_, _), DataType::KeyRef)
1145            | (Value::DocRef(_, _), DataType::DocRef)
1146            | (Value::TableRef(_), DataType::TableRef)
1147            | (Value::PageRef(_), DataType::PageRef)
1148            | (Value::Secret(_), DataType::Secret)
1149            | (Value::Password(_), DataType::Password)
1150    )
1151}
1152
1153fn value_to_coercion_input(value: &Value) -> Option<String> {
1154    match value {
1155        Value::Null => None,
1156        Value::Integer(value) => Some(value.to_string()),
1157        Value::UnsignedInteger(value) => Some(value.to_string()),
1158        Value::Float(value) => Some(value.to_string()),
1159        Value::Text(value) => Some(value.to_string()),
1160        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1161        Value::Boolean(value) => Some(value.to_string()),
1162        Value::Timestamp(value) => Some(value.to_string()),
1163        Value::Duration(value) => Some(value.to_string()),
1164        Value::IpAddr(value) => Some(value.to_string()),
1165        Value::MacAddr(value) => Some(format!(
1166            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1167            value[0], value[1], value[2], value[3], value[4], value[5]
1168        )),
1169        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1170        Value::Email(value) => Some(value.clone()),
1171        Value::Url(value) => Some(value.clone()),
1172        Value::Phone(value) => Some(value.to_string()),
1173        Value::Semver(value) => Some(format!(
1174            "{}.{}.{}",
1175            value / 1_000_000,
1176            (value / 1_000) % 1_000,
1177            value % 1_000
1178        )),
1179        Value::Date(value) => Some(value.to_string()),
1180        Value::Time(value) => Some(value.to_string()),
1181        Value::Decimal(value) => Some(value.to_string()),
1182        Value::TimestampMs(value) => Some(value.to_string()),
1183        Value::Ipv4(value) => Some(format!(
1184            "{}.{}.{}.{}",
1185            (value >> 24) & 0xFF,
1186            (value >> 16) & 0xFF,
1187            (value >> 8) & 0xFF,
1188            value & 0xFF
1189        )),
1190        Value::Port(value) => Some(value.to_string()),
1191        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1192        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1193        Value::GeoPoint(lat, lon) => Some(format!(
1194            "{},{}",
1195            *lat as f64 / 1_000_000.0,
1196            *lon as f64 / 1_000_000.0
1197        )),
1198        Value::BigInt(value) => Some(value.to_string()),
1199        Value::TableRef(value) => Some(value.clone()),
1200        Value::PageRef(value) => Some(value.to_string()),
1201        Value::Password(value) => Some(value.clone()),
1202        _ => None,
1203    }
1204}
1205
1206fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1207    if modified_columns.is_empty() {
1208        return modified_columns;
1209    }
1210
1211    let mut unique = Vec::with_capacity(modified_columns.len());
1212    for column in modified_columns.drain(..) {
1213        if !unique
1214            .iter()
1215            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1216        {
1217            unique.push(column);
1218        }
1219    }
1220    unique
1221}
1222
1223fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1224    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1225        return Err(crate::RedDBError::Query(
1226            "array positional document patch paths are unsupported; replace the array or full document body instead"
1227                .to_string(),
1228        ));
1229    }
1230    Ok(())
1231}
1232
1233fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1234    match fields.get("body") {
1235        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1236            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1237        }),
1238        Some(_) => Err(crate::RedDBError::Query(
1239            "document body field must contain JSON".to_string(),
1240        )),
1241        None => Ok(JsonValue::Object(Default::default())),
1242    }
1243}
1244
1245fn replace_document_row_body(
1246    fields: &mut HashMap<String, Value>,
1247    body: JsonValue,
1248    modified_columns: &mut Vec<String>,
1249) -> RedDBResult<()> {
1250    modified_columns.push("body".to_string());
1251
1252    let old_keys: Vec<String> = fields
1253        .keys()
1254        .filter(|key| key.as_str() != "body")
1255        .cloned()
1256        .collect();
1257    for key in old_keys {
1258        fields.remove(&key);
1259        modified_columns.push(key);
1260    }
1261
1262    let body_bytes = json_to_vec(&body).map_err(|err| {
1263        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1264    })?;
1265    fields.insert("body".to_string(), Value::Json(body_bytes));
1266
1267    if let JsonValue::Object(map) = &body {
1268        for (key, value) in map {
1269            fields.insert(key.clone(), json_to_storage_value(value)?);
1270            modified_columns.push(key.clone());
1271        }
1272    }
1273
1274    Ok(())
1275}
1276
1277impl RedDBRuntime {
1278    pub(crate) fn apply_loaded_patch_entity_core(
1279        &self,
1280        collection: String,
1281        mut entity: crate::storage::UnifiedEntity,
1282        payload: JsonValue,
1283        operations: Vec<PatchEntityOperation>,
1284    ) -> RedDBResult<AppliedEntityMutation> {
1285        let id = entity.id;
1286        let previous_xmax = entity.xmax;
1287        let operations = normalize_ttl_patch_operations(operations)?;
1288        // Snapshot pre-patch row fields for the secondary-index hook —
1289        // empty for non-row entities, which is the desired no-op.
1290        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1291
1292        // Versioned collections retain MVCC history: a PATCH/UPDATE must
1293        // produce a NEW full version (the merged document/row/node) with
1294        // the prior version tombstoned, mirroring the SQL UPDATE row
1295        // path. Gated STRICTLY on the collection `versioned` flag —
1296        // non-versioned collections keep last-writer-wins in-place
1297        // mutation. Row entities (documents / table rows / KV) carry
1298        // logical-id MVCC history (Phase 1/2); graph nodes & edges join
1299        // in Phase 3. The post-image re-stamp (new physical id, same
1300        // logical_id, fresh xmin) at the tail of this function is fully
1301        // kind-agnostic, so the same machinery installs a versioned
1302        // graph-node update.
1303        let patch_versions = matches!(
1304            entity.data,
1305            crate::storage::EntityData::Row(_)
1306                | crate::storage::EntityData::Node(_)
1307                | crate::storage::EntityData::Edge(_)
1308        ) && self.vcs_is_versioned(&collection).unwrap_or(false);
1309        let versioned_update_xid = if patch_versions {
1310            match self.current_xid() {
1311                Some(xid) => Some(xid),
1312                None => {
1313                    let snapshot_manager = self.snapshot_manager();
1314                    let xid = snapshot_manager.begin();
1315                    snapshot_manager.commit(xid);
1316                    Some(xid)
1317                }
1318            }
1319        } else {
1320            None
1321        };
1322        let mut replaced_entity = versioned_update_xid.map(|xid| {
1323            let mut old = entity.clone();
1324            old.set_xmax(xid);
1325            old
1326        });
1327
1328        let db = self.db();
1329        let store = db.store();
1330        let Some(manager) = store.get_collection(&collection) else {
1331            return Err(crate::RedDBError::NotFound(format!(
1332                "collection not found: {collection}"
1333            )));
1334        };
1335
1336        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1337        let mut metadata_changed = false;
1338        let mut modified_columns: Vec<String> = Vec::new();
1339        let mut context_index_dirty = false;
1340        let mut graph_node_type: Option<String> = None;
1341        let mut graph_edge_weight: Option<f32> = None;
1342
1343        let row_contract_timestamps = db
1344            .collection_contract(&collection)
1345            .map(|c| c.timestamps_enabled)
1346            .unwrap_or(false);
1347
1348        match &mut entity.data {
1349            crate::storage::EntityData::Row(row) => {
1350                let is_document_collection = db
1351                    .collection_contract(&collection)
1352                    .map(|contract| {
1353                        contract.declared_model == crate::catalog::CollectionModel::Document
1354                    })
1355                    .unwrap_or(false);
1356                let mut field_ops = Vec::new();
1357                let mut metadata_ops = Vec::new();
1358                let mut document_body_ops = Vec::new();
1359
1360                for mut op in operations {
1361                    let Some(root) = op.path.first().map(String::as_str) else {
1362                        return Err(crate::RedDBError::Query(
1363                            "patch path cannot be empty".to_string(),
1364                        ));
1365                    };
1366
1367                    match root {
1368                        "body" if is_document_collection => {
1369                            if op.path.len() < 2 {
1370                                return Err(crate::RedDBError::Query(
1371                                    "document body patch paths require a nested key; use payload.body for full replacement"
1372                                        .to_string(),
1373                                ));
1374                            }
1375                            op.path.remove(0);
1376                            reject_document_array_position_path(&op.path)?;
1377                            document_body_ops.push(op);
1378                        }
1379                        "fields" | "named" => {
1380                            if op.path.len() < 2 {
1381                                return Err(crate::RedDBError::Query(
1382                                    "patch path 'fields' requires a nested key".to_string(),
1383                                ));
1384                            }
1385                            if row_contract_timestamps {
1386                                let leaf = op.path.get(1).map(String::as_str);
1387                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1388                                    return Err(crate::RedDBError::Query(format!(
1389                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1390                                        collection,
1391                                        leaf.unwrap_or("")
1392                                    )));
1393                                }
1394                            }
1395                            op.path.remove(0);
1396                            field_ops.push(op);
1397                        }
1398                        "metadata" => {
1399                            if op.path.len() < 2 {
1400                                return Err(crate::RedDBError::Query(
1401                                    "patch path 'metadata' requires a nested key".to_string(),
1402                                ));
1403                            }
1404                            op.path.remove(0);
1405                            metadata_ops.push(op);
1406                        }
1407                        _ => {
1408                            return Err(crate::RedDBError::Query(format!(
1409                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1410                            )));
1411                        }
1412                    }
1413                }
1414
1415                if !document_body_ops.is_empty() {
1416                    context_index_dirty = true;
1417                    let named = row.named.get_or_insert_with(Default::default);
1418                    let mut body = document_body_from_named(named)?;
1419                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1420                        .map_err(crate::RedDBError::Query)?;
1421                    replace_document_row_body(named, body, &mut modified_columns)?;
1422                }
1423
1424                if is_document_collection {
1425                    if let Some(body) = payload.get("body") {
1426                        context_index_dirty = true;
1427                        let named = row.named.get_or_insert_with(Default::default);
1428                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1429                    }
1430                }
1431
1432                if !field_ops.is_empty() {
1433                    context_index_dirty = true;
1434                    for op in &field_ops {
1435                        if let Some(col) = op.path.first() {
1436                            modified_columns.push(col.clone());
1437                        }
1438                    }
1439                    let named = row.named.get_or_insert_with(Default::default);
1440                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1441                }
1442
1443                if let Some(fields) = payload
1444                    .get("fields")
1445                    .and_then(crate::json::Value::as_object)
1446                {
1447                    if row_contract_timestamps {
1448                        for key in fields.keys() {
1449                            if key == "created_at" || key == "updated_at" {
1450                                return Err(crate::RedDBError::Query(format!(
1451                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1452                                    collection, key
1453                                )));
1454                            }
1455                        }
1456                    }
1457                    context_index_dirty = true;
1458                    let named = row.named.get_or_insert_with(Default::default);
1459                    for (key, value) in fields {
1460                        modified_columns.push(key.clone());
1461                        named.insert(key.clone(), json_to_storage_value(value)?);
1462                    }
1463                }
1464
1465                if !metadata_ops.is_empty() {
1466                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1467                    let metadata = patch_metadata.get_or_insert_with(|| {
1468                        store.get_metadata(&collection, id).unwrap_or_default()
1469                    });
1470                    let mut metadata_json = metadata_to_json(metadata);
1471                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1472                        .map_err(crate::RedDBError::Query)?;
1473                    *metadata = metadata_from_json(&metadata_json)?;
1474                    metadata_changed = true;
1475                }
1476
1477                if !modified_columns.is_empty() || row_contract_timestamps {
1478                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1479                    let current_fields = if let Some(named) = row.named.take() {
1480                        named.into_iter().collect::<Vec<_>>()
1481                    } else if let Some(schema) = row.schema.as_ref() {
1482                        schema
1483                            .iter()
1484                            .cloned()
1485                            .zip(row.columns.iter().cloned())
1486                            .collect::<Vec<_>>()
1487                    } else {
1488                        Vec::new()
1489                    };
1490                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1491                    if row_contract_timestamps {
1492                        modified_columns.push("updated_at".to_string());
1493                        context_index_dirty = true;
1494                    }
1495                    if contract.requires_uniqueness_check(&modified_columns) {
1496                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1497                    }
1498                    row.named = Some(normalized_fields.into_iter().collect());
1499                }
1500            }
1501            crate::storage::EntityData::Node(node) => {
1502                let mut field_ops = Vec::new();
1503                let mut metadata_ops = Vec::new();
1504                let mut node_type_ops = Vec::new();
1505
1506                for mut op in operations {
1507                    let Some(root) = op.path.first().map(String::as_str) else {
1508                        return Err(crate::RedDBError::Query(
1509                            "patch path cannot be empty".to_string(),
1510                        ));
1511                    };
1512
1513                    match root {
1514                        "fields" | "properties" => {
1515                            if op.path.len() < 2 {
1516                                return Err(crate::RedDBError::Query(
1517                                    "patch path 'fields' requires a nested key".to_string(),
1518                                ));
1519                            }
1520                            op.path.remove(0);
1521                            field_ops.push(op);
1522                        }
1523                        "metadata" => {
1524                            if op.path.len() < 2 {
1525                                return Err(crate::RedDBError::Query(
1526                                    "patch path 'metadata' requires a nested key".to_string(),
1527                                ));
1528                            }
1529                            op.path.remove(0);
1530                            metadata_ops.push(op);
1531                        }
1532                        "node_type" => {
1533                            if op.path.len() != 1 {
1534                                return Err(crate::RedDBError::Query(
1535                                    "patch path 'node_type' does not allow nested keys".to_string(),
1536                                ));
1537                            }
1538                            op.path.clear();
1539                            node_type_ops.push(op);
1540                        }
1541                        _ => {
1542                            return Err(crate::RedDBError::Query(format!(
1543                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1544                            )));
1545                        }
1546                    }
1547                }
1548
1549                for op in node_type_ops {
1550                    context_index_dirty = true;
1551                    let value = op.value.ok_or_else(|| {
1552                        crate::RedDBError::Query("node_type operations require a value".to_string())
1553                    })?;
1554
1555                    match op.op {
1556                        PatchEntityOperationType::Unset => {
1557                            return Err(crate::RedDBError::Query(
1558                                "node_type cannot be unset through patch operations".to_string(),
1559                            ));
1560                        }
1561                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1562                            let Some(node_type) = value.as_str() else {
1563                                return Err(crate::RedDBError::Query(
1564                                    "node_type operation requires a text value".to_string(),
1565                                ));
1566                            };
1567                            graph_node_type = Some(node_type.to_string());
1568                            modified_columns.push("node_type".to_string());
1569                        }
1570                    }
1571                }
1572
1573                if !field_ops.is_empty() {
1574                    context_index_dirty = true;
1575                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1576                }
1577
1578                if let Some(fields) = payload
1579                    .get("fields")
1580                    .and_then(crate::json::Value::as_object)
1581                {
1582                    context_index_dirty = true;
1583                    for (key, value) in fields {
1584                        node.properties
1585                            .insert(key.clone(), json_to_storage_value(value)?);
1586                    }
1587                }
1588
1589                if !metadata_ops.is_empty() {
1590                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1591                    let metadata = patch_metadata.get_or_insert_with(|| {
1592                        store.get_metadata(&collection, id).unwrap_or_default()
1593                    });
1594                    let mut metadata_json = metadata_to_json(metadata);
1595                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1596                        .map_err(crate::RedDBError::Query)?;
1597                    *metadata = metadata_from_json(&metadata_json)?;
1598                    metadata_changed = true;
1599                }
1600            }
1601            crate::storage::EntityData::Edge(edge) => {
1602                let mut field_ops = Vec::new();
1603                let mut metadata_ops = Vec::new();
1604                let mut weight_ops = Vec::new();
1605
1606                for mut op in operations {
1607                    let Some(root) = op.path.first().map(String::as_str) else {
1608                        return Err(crate::RedDBError::Query(
1609                            "patch path cannot be empty".to_string(),
1610                        ));
1611                    };
1612
1613                    match root {
1614                        "fields" | "properties" => {
1615                            if op.path.len() < 2 {
1616                                return Err(crate::RedDBError::Query(
1617                                    "patch path 'fields' requires a nested key".to_string(),
1618                                ));
1619                            }
1620                            op.path.remove(0);
1621                            field_ops.push(op);
1622                        }
1623                        "weight" => {
1624                            if op.path.len() != 1 {
1625                                return Err(crate::RedDBError::Query(
1626                                    "patch path 'weight' does not allow nested keys".to_string(),
1627                                ));
1628                            }
1629                            op.path.clear();
1630                            weight_ops.push(op);
1631                        }
1632                        "metadata" => {
1633                            if op.path.len() < 2 {
1634                                return Err(crate::RedDBError::Query(
1635                                    "patch path 'metadata' requires a nested key".to_string(),
1636                                ));
1637                            }
1638                            op.path.remove(0);
1639                            metadata_ops.push(op);
1640                        }
1641                        _ => {
1642                            return Err(crate::RedDBError::Query(format!(
1643                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1644                            )));
1645                        }
1646                    }
1647                }
1648
1649                if !field_ops.is_empty() {
1650                    context_index_dirty = true;
1651                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1652                }
1653
1654                for op in weight_ops {
1655                    context_index_dirty = true;
1656                    let value = op.value.ok_or_else(|| {
1657                        crate::RedDBError::Query("weight operations require a value".to_string())
1658                    })?;
1659
1660                    match op.op {
1661                        PatchEntityOperationType::Unset => {
1662                            return Err(crate::RedDBError::Query(
1663                                "weight cannot be unset through patch operations".to_string(),
1664                            ));
1665                        }
1666                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1667                            let Some(weight) = value.as_f64() else {
1668                                return Err(crate::RedDBError::Query(
1669                                    "weight operation requires a numeric value".to_string(),
1670                                ));
1671                            };
1672                            graph_edge_weight = Some(weight as f32);
1673                            edge.weight = weight as f32;
1674                            modified_columns.push("weight".to_string());
1675                        }
1676                    }
1677                }
1678
1679                if let Some(fields) = payload
1680                    .get("fields")
1681                    .and_then(crate::json::Value::as_object)
1682                {
1683                    context_index_dirty = true;
1684                    for (key, value) in fields {
1685                        edge.properties
1686                            .insert(key.clone(), json_to_storage_value(value)?);
1687                    }
1688                }
1689
1690                if !metadata_ops.is_empty() {
1691                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1692                    let metadata = patch_metadata.get_or_insert_with(|| {
1693                        store.get_metadata(&collection, id).unwrap_or_default()
1694                    });
1695                    let mut metadata_json = metadata_to_json(metadata);
1696                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1697                        .map_err(crate::RedDBError::Query)?;
1698                    *metadata = metadata_from_json(&metadata_json)?;
1699                    metadata_changed = true;
1700                }
1701            }
1702            crate::storage::EntityData::Vector(vector) => {
1703                let mut field_ops = Vec::new();
1704                let mut metadata_ops = Vec::new();
1705
1706                for mut op in operations {
1707                    let Some(root) = op.path.first().map(String::as_str) else {
1708                        return Err(crate::RedDBError::Query(
1709                            "patch path cannot be empty".to_string(),
1710                        ));
1711                    };
1712
1713                    match root {
1714                        "fields" => {
1715                            if op.path.len() < 2 {
1716                                return Err(crate::RedDBError::Query(
1717                                    "patch path 'fields' requires a nested key".to_string(),
1718                                ));
1719                            }
1720                            op.path.remove(0);
1721                            let Some(target) = op.path.first().map(String::as_str) else {
1722                                return Err(crate::RedDBError::Query(
1723                                    "patch path requires a target under fields".to_string(),
1724                                ));
1725                            };
1726                            if !matches!(target, "dense" | "content" | "sparse") {
1727                                return Err(crate::RedDBError::Query(format!(
1728                                    "unsupported vector patch target '{target}'"
1729                                )));
1730                            }
1731                            field_ops.push(op);
1732                        }
1733                        "metadata" => {
1734                            if op.path.len() < 2 {
1735                                return Err(crate::RedDBError::Query(
1736                                    "patch path 'metadata' requires a nested key".to_string(),
1737                                ));
1738                            }
1739                            op.path.remove(0);
1740                            metadata_ops.push(op);
1741                        }
1742                        _ => {
1743                            return Err(crate::RedDBError::Query(format!(
1744                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1745                            )));
1746                        }
1747                    }
1748                }
1749
1750                if !field_ops.is_empty() {
1751                    context_index_dirty = true;
1752                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1753                }
1754
1755                if let Some(fields) = payload
1756                    .get("fields")
1757                    .and_then(crate::json::Value::as_object)
1758                {
1759                    context_index_dirty = true;
1760                    if let Some(content) =
1761                        fields.get("content").and_then(crate::json::Value::as_str)
1762                    {
1763                        vector.content = Some(content.to_string());
1764                    }
1765                    if let Some(dense) = fields.get("dense") {
1766                        vector.dense = dense
1767                            .as_array()
1768                            .ok_or_else(|| {
1769                                crate::RedDBError::Query(
1770                                    "field 'dense' must be an array".to_string(),
1771                                )
1772                            })?
1773                            .iter()
1774                            .map(|value| {
1775                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1776                                    crate::RedDBError::Query(
1777                                        "field 'dense' must contain only numbers".to_string(),
1778                                    )
1779                                })
1780                            })
1781                            .collect::<Result<Vec<_>, _>>()?;
1782                    }
1783                }
1784
1785                if !metadata_ops.is_empty() {
1786                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1787                    let metadata = patch_metadata.get_or_insert_with(|| {
1788                        store.get_metadata(&collection, id).unwrap_or_default()
1789                    });
1790                    let mut metadata_json = metadata_to_json(metadata);
1791                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1792                        .map_err(crate::RedDBError::Query)?;
1793                    *metadata = metadata_from_json(&metadata_json)?;
1794                    metadata_changed = true;
1795                }
1796            }
1797            crate::storage::EntityData::TimeSeries(_)
1798            | crate::storage::EntityData::QueueMessage(_) => {
1799                return Err(crate::RedDBError::Query(
1800                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1801                        .to_string(),
1802                ));
1803            }
1804        }
1805
1806        if let Some(node_type) = graph_node_type {
1807            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1808                node.node_type = node_type;
1809            }
1810        }
1811        if let Some(weight) = graph_edge_weight {
1812            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1813                edge.weight = (weight * 1000.0) as u32;
1814            }
1815        }
1816
1817        if let Some(metadata) = payload
1818            .get("metadata")
1819            .and_then(crate::json::Value::as_object)
1820        {
1821            let patch_metadata = patch_metadata
1822                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1823            for (key, value) in metadata {
1824                ensure_non_tree_reserved_metadata_key(key)?;
1825                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1826            }
1827            metadata_changed = true;
1828        }
1829
1830        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1831            let patch_metadata = patch_metadata
1832                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1833            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1834                patch_metadata.remove(&key);
1835            } else {
1836                patch_metadata.set(key, value);
1837            }
1838            metadata_changed = true;
1839        }
1840
1841        entity.updated_at = std::time::SystemTime::now()
1842            .duration_since(std::time::UNIX_EPOCH)
1843            .unwrap_or_default()
1844            .as_secs();
1845
1846        // Versioned PATCH: re-stamp the merged post-image as a fresh
1847        // physical version keyed on the same logical_id, and close the
1848        // prior version's xmax at the same xid. `install_versioned_*`
1849        // in persist installs the new row and tombstones the old one;
1850        // `record_pending_versioned_update` arms first-committer-wins.
1851        if let Some(xid) = versioned_update_xid {
1852            let logical_id = entity.logical_id();
1853            entity.id = store.next_entity_id();
1854            entity.set_logical_id(logical_id);
1855            entity.set_xmin(xid);
1856            entity.set_xmax(0);
1857            if let Some(old) = replaced_entity.as_mut() {
1858                old.set_xmax(xid);
1859            }
1860        }
1861
1862        modified_columns = dedupe_modified_columns(modified_columns);
1863
1864        Ok(AppliedEntityMutation {
1865            id: entity.id,
1866            collection,
1867            entity,
1868            metadata: patch_metadata,
1869            modified_columns,
1870            persist_metadata: metadata_changed,
1871            context_index_dirty,
1872            replaced_entity,
1873            replaced_entity_previous_xmax: previous_xmax,
1874            pre_mutation_fields,
1875        })
1876    }
1877
1878    pub(crate) fn apply_loaded_sql_update_row_core(
1879        &self,
1880        collection: String,
1881        mut entity: crate::storage::UnifiedEntity,
1882        static_field_assignments: &[(String, Value)],
1883        dynamic_field_assignments: Vec<(String, Value)>,
1884        static_metadata_assignments: &[(String, MetadataValue)],
1885        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1886        row_contract_plan: Option<&RowUpdateContractPlan>,
1887        row_modified_columns_template: &[String],
1888        row_touches_unique_columns: bool,
1889    ) -> RedDBResult<AppliedEntityMutation> {
1890        let id = entity.id;
1891        let previous_xmax = entity.xmax;
1892        let db = self.db();
1893        let store = db.store();
1894        let Some(_) = store.get_collection(&collection) else {
1895            return Err(crate::RedDBError::NotFound(format!(
1896                "collection not found: {collection}"
1897            )));
1898        };
1899
1900        let versioned_update_xid = match self.current_xid() {
1901            Some(xid) => Some(xid),
1902            None => {
1903                let snapshot_manager = self.snapshot_manager();
1904                let xid = snapshot_manager.begin();
1905                snapshot_manager.commit(xid);
1906                Some(xid)
1907            }
1908        };
1909        let mut replaced_entity = versioned_update_xid.map(|xid| {
1910            let mut old = entity.clone();
1911            old.set_xmax(xid);
1912            old
1913        });
1914
1915        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1916        let row_contract_timestamps = row_contract_plan
1917            .map(|plan| plan.timestamps_enabled)
1918            .unwrap_or(false);
1919        let mut metadata_changed = false;
1920        let mut modified_columns = row_modified_columns_template.to_vec();
1921        let mut context_index_dirty = !modified_columns.is_empty();
1922
1923        // Snapshot OLD field values BEFORE applying the assignments —
1924        // the secondary-index maintenance hook needs both before/after to
1925        // delete-then-insert under changed indexed columns.
1926        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1927
1928        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1929            return Err(crate::RedDBError::Query(
1930                "SQL row update fast path requires a row entity".to_string(),
1931            ));
1932        };
1933
1934        let _ = row_contract_plan;
1935        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1936        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1937
1938        for (key, value) in static_metadata_assignments
1939            .iter()
1940            .cloned()
1941            .chain(dynamic_metadata_assignments)
1942        {
1943            ensure_non_tree_reserved_metadata_key(&key)?;
1944            patch_metadata
1945                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1946                .set(key, value);
1947            metadata_changed = true;
1948        }
1949
1950        if !modified_columns.is_empty() || row_contract_timestamps {
1951            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1952            if row_contract_timestamps {
1953                context_index_dirty = true;
1954                set_row_field(
1955                    row,
1956                    "updated_at",
1957                    Value::UnsignedInteger(current_unix_ms_u64()),
1958                );
1959                modified_columns.push("updated_at".to_string());
1960            }
1961            if row_touches_unique_columns {
1962                let current_fields = collect_row_fields(row);
1963                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1964            }
1965        }
1966
1967        entity.updated_at = std::time::SystemTime::now()
1968            .duration_since(std::time::UNIX_EPOCH)
1969            .unwrap_or_default()
1970            .as_secs();
1971
1972        if let Some(xid) = versioned_update_xid {
1973            let logical_id = entity.logical_id();
1974            entity.id = store.next_entity_id();
1975            entity.set_logical_id(logical_id);
1976            entity.set_xmin(xid);
1977            entity.set_xmax(0);
1978            if let Some(old) = replaced_entity.as_mut() {
1979                old.set_xmax(xid);
1980            }
1981        }
1982
1983        modified_columns = dedupe_modified_columns(modified_columns);
1984
1985        Ok(AppliedEntityMutation {
1986            id: entity.id,
1987            collection,
1988            entity,
1989            metadata: patch_metadata,
1990            modified_columns,
1991            persist_metadata: metadata_changed,
1992            context_index_dirty,
1993            replaced_entity,
1994            replaced_entity_previous_xmax: previous_xmax,
1995            pre_mutation_fields,
1996        })
1997    }
1998
1999    pub(crate) fn persist_applied_entity_mutations(
2000        &self,
2001        applied: &[AppliedEntityMutation],
2002    ) -> RedDBResult<()> {
2003        if applied.is_empty() {
2004            return Ok(());
2005        }
2006
2007        let store = self.db().store();
2008        let collection = &applied[0].collection;
2009        let Some(manager) = store.get_collection(collection) else {
2010            return Err(crate::RedDBError::NotFound(format!(
2011                "collection not found: {collection}"
2012            )));
2013        };
2014
2015        let mut ordinary = Vec::with_capacity(applied.len());
2016        for item in applied {
2017            if let Some(old_version) = item.replaced_entity.as_ref() {
2018                store
2019                    .install_versioned_table_row_update(
2020                        collection,
2021                        old_version.clone(),
2022                        item.entity.clone(),
2023                        if item.persist_metadata {
2024                            item.metadata.as_ref()
2025                        } else {
2026                            None
2027                        },
2028                    )
2029                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2030                if self.current_xid().is_some() {
2031                    self.record_pending_versioned_update(
2032                        crate::runtime::impl_core::current_connection_id(),
2033                        collection,
2034                        old_version.id,
2035                        item.entity.id,
2036                        old_version.xmax,
2037                        item.replaced_entity_previous_xmax,
2038                    );
2039                }
2040            } else {
2041                ordinary.push(item);
2042            }
2043        }
2044        if ordinary.is_empty() {
2045            return Ok(());
2046        }
2047
2048        manager
2049            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
2050                (
2051                    &item.entity,
2052                    item.modified_columns.as_slice(),
2053                    if item.persist_metadata {
2054                        item.metadata.as_ref()
2055                    } else {
2056                        None
2057                    },
2058                )
2059            }))
2060            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
2061
2062        // PG-HOT-like fast path: segment in-place is done; when no
2063        // mutation touches a secondary-indexed column AND no metadata
2064        // payload needs to be folded into a B-tree record, skip the
2065        // in-line B-tree upsert. The WAL still records the update
2066        // (durability preserved; recovery replay rebuilds the B-tree),
2067        // and `manager.get()` prefers the live segment over the
2068        // B-tree for reads — so the short-circuit is invisible to
2069        // callers. See `persist_entities_to_pager_wal_only`.
2070        let indexed_cols = self
2071            .index_store_ref()
2072            .indexed_columns_set(collection.as_str());
2073        let all_hot = !indexed_cols.is_empty()
2074            && ordinary.iter().all(|item| {
2075                !item.persist_metadata
2076                    && !item
2077                        .modified_columns
2078                        .iter()
2079                        .any(|c| indexed_cols.contains(c))
2080            })
2081            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2082
2083        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2084        // inner loop hands us `applied` which already owns the post-image
2085        // entity; all we need for the persist path is a read borrow.
2086        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2087            ordinary.iter().map(|item| &item.entity).collect();
2088        let persist_fn = if all_hot {
2089            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2090        } else {
2091            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2092        };
2093        persist_fn(store.as_ref(), collection, &entity_refs)
2094            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2095    }
2096
2097    pub(crate) fn flush_applied_entity_mutation(
2098        &self,
2099        applied: &AppliedEntityMutation,
2100    ) -> RedDBResult<()> {
2101        let store = self.db().store();
2102        if applied.context_index_dirty {
2103            store
2104                .context_index()
2105                .index_entity(&applied.collection, &applied.entity);
2106        }
2107        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2108        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2109        // didn't carry recoverable column names) — there's nothing to
2110        // delete-then-insert in that case.
2111        //
2112        // Also build the CDC damage vector here so downstream consumers
2113        // see which columns changed without re-diffing.
2114        let mut changed_columns: Option<Vec<String>> = None;
2115        if !applied.pre_mutation_fields.is_empty() {
2116            let post = entity_row_fields_snapshot(&applied.entity);
2117            if !post.is_empty() {
2118                let damage = crate::application::entity::row_damage_vector(
2119                    &applied.pre_mutation_fields,
2120                    &post,
2121                );
2122                if !damage.is_empty() {
2123                    changed_columns = Some(
2124                        damage
2125                            .touched_columns()
2126                            .into_iter()
2127                            .map(str::to_string)
2128                            .collect(),
2129                    );
2130                }
2131
2132                // HOT-like fast path (P3.T2/T3): when no modified
2133                // column is covered by a secondary index, skip the
2134                // `index_entity_update` call entirely. The function
2135                // would short-circuit internally, but the call still
2136                // reads the registry lock + walks the damage vector
2137                // — avoiding it saves a few microseconds per UPDATE.
2138                // Page-local replace + t_ctid chain support (true
2139                // HOT) lives in a follow-up storage spec.
2140                let indexed_cols: std::collections::HashSet<String> = self
2141                    .index_store_ref()
2142                    .list_indices(applied.collection.as_str())
2143                    .into_iter()
2144                    .filter_map(|idx| idx.columns.first().cloned())
2145                    .collect();
2146                let modified_cols: std::collections::HashSet<String> = damage
2147                    .touched_columns()
2148                    .into_iter()
2149                    .map(str::to_string)
2150                    .collect();
2151                if let Some(old_version) = applied.replaced_entity.as_ref() {
2152                    let old_index_fields: Vec<(String, Value)> = applied
2153                        .pre_mutation_fields
2154                        .iter()
2155                        .filter(|(col, _)| indexed_cols.contains(col))
2156                        .cloned()
2157                        .collect();
2158                    let new_index_fields: Vec<(String, Value)> = post
2159                        .iter()
2160                        .filter(|(col, _)| indexed_cols.contains(col))
2161                        .cloned()
2162                        .collect();
2163                    if !old_index_fields.is_empty() {
2164                        self.index_store_ref()
2165                            .index_entity_delete(
2166                                &applied.collection,
2167                                old_version.id,
2168                                &old_index_fields,
2169                            )
2170                            .map_err(crate::RedDBError::Internal)?;
2171                    }
2172                    if !new_index_fields.is_empty() {
2173                        self.index_store_ref()
2174                            .index_entity_insert(
2175                                &applied.collection,
2176                                applied.entity.id,
2177                                &new_index_fields,
2178                            )
2179                            .map_err(crate::RedDBError::Internal)?;
2180                    }
2181                } else {
2182                    let decision = crate::storage::engine::hot_update::decide(
2183                        &crate::storage::engine::hot_update::HotUpdateInputs {
2184                            collection: applied.collection.as_str(),
2185                            indexed_columns: &indexed_cols,
2186                            modified_columns: &modified_cols,
2187                            // The storage layer currently handles fit via
2188                            // the segment abstraction; we bypass the
2189                            // page-size check here.
2190                            new_tuple_size: 0,
2191                            page_free_space: usize::MAX,
2192                        },
2193                    );
2194                    if !decision.can_hot {
2195                        self.index_store_ref()
2196                            .index_entity_update(
2197                                &applied.collection,
2198                                applied.id,
2199                                &applied.pre_mutation_fields,
2200                                &post,
2201                            )
2202                            .map_err(crate::RedDBError::Internal)?;
2203                    } else {
2204                        // F-04: `applied.collection` is tenant-supplied;
2205                        // strip CR/LF/control bytes via the LogField
2206                        // escaper (ADR 0010).
2207                        tracing::debug!(
2208                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2209                            "hot_update fast-path: skipped index_entity_update"
2210                        );
2211                    }
2212                }
2213            }
2214        }
2215        self.cdc_emit_prebuilt_with_columns(
2216            crate::replication::cdc::ChangeOperation::Update,
2217            &applied.collection,
2218            &applied.entity,
2219            "entity",
2220            applied.metadata.as_ref(),
2221            true,
2222            changed_columns,
2223        );
2224        Ok(())
2225    }
2226
2227    pub(crate) fn apply_loaded_patch_entity(
2228        &self,
2229        collection: String,
2230        entity: crate::storage::UnifiedEntity,
2231        payload: JsonValue,
2232        operations: Vec<PatchEntityOperation>,
2233    ) -> RedDBResult<CreateEntityOutput> {
2234        let applied =
2235            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2236        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2237        self.flush_applied_entity_mutation(&applied)?;
2238        Ok(CreateEntityOutput {
2239            id: applied.id,
2240            entity: Some(applied.entity),
2241        })
2242    }
2243}
2244
2245fn ensure_non_tree_reserved_metadata_patch_paths(
2246    operations: &[PatchEntityOperation],
2247) -> RedDBResult<()> {
2248    for operation in operations {
2249        let Some(key) = operation.path.first().map(String::as_str) else {
2250            continue;
2251        };
2252        ensure_non_tree_reserved_metadata_key(key)?;
2253    }
2254    Ok(())
2255}
2256
2257fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2258    if key.starts_with(TREE_METADATA_PREFIX) {
2259        return Err(crate::RedDBError::Query(format!(
2260            "metadata key '{}' is reserved for managed trees",
2261            key
2262        )));
2263    }
2264    Ok(())
2265}
2266
2267fn ensure_non_tree_reserved_metadata_entries(
2268    metadata: &[(String, MetadataValue)],
2269) -> RedDBResult<()> {
2270    for (key, _) in metadata {
2271        ensure_non_tree_reserved_metadata_key(key)?;
2272    }
2273    Ok(())
2274}
2275
2276fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2277    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2278        return Err(crate::RedDBError::Query(format!(
2279            "edge label '{}' is reserved for managed trees",
2280            TREE_CHILD_EDGE_LABEL
2281        )));
2282    }
2283    Ok(())
2284}
2285
2286impl RedDBRuntime {
2287    pub(crate) fn create_node_unchecked(
2288        &self,
2289        input: CreateNodeInput,
2290    ) -> RedDBResult<CreateEntityOutput> {
2291        let db = self.db();
2292        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2293        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2294        let mut metadata = input.metadata;
2295        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2296        let mut builder = db.node(&input.collection, &input.label);
2297
2298        if let Some(node_type) = input.node_type {
2299            builder = builder.node_type(node_type);
2300        }
2301
2302        for (key, value) in input.properties {
2303            builder = builder.property(key, value);
2304        }
2305
2306        for (key, value) in metadata {
2307            builder = builder.metadata(key, value);
2308        }
2309
2310        for embedding in input.embeddings {
2311            if let Some(model) = embedding.model {
2312                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2313            } else {
2314                builder = builder.embedding(embedding.name, embedding.vector);
2315            }
2316        }
2317
2318        for link in input.table_links {
2319            builder = builder.link_to_table(link.key, link.table);
2320        }
2321
2322        for link in input.node_links {
2323            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2324        }
2325
2326        let id = builder.save()?;
2327        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2328        // don't see this node until the transaction commits.
2329        self.stamp_xmin_if_in_txn(&input.collection, id);
2330        refresh_context_index(&db, &input.collection, id)?;
2331        self.cdc_emit(
2332            crate::replication::cdc::ChangeOperation::Insert,
2333            &input.collection,
2334            id.raw(),
2335            "graph_node",
2336        );
2337        Ok(CreateEntityOutput {
2338            id,
2339            entity: db.store().get(&input.collection, id),
2340        })
2341    }
2342
2343    pub(crate) fn create_edge_unchecked(
2344        &self,
2345        input: CreateEdgeInput,
2346    ) -> RedDBResult<CreateEntityOutput> {
2347        let db = self.db();
2348        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2349        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2350        let mut metadata = input.metadata;
2351        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2352        let mut builder = db
2353            .edge(&input.collection, &input.label)
2354            .from(input.from)
2355            .to(input.to);
2356
2357        if let Some(weight) = input.weight {
2358            builder = builder.weight(weight);
2359        }
2360
2361        for (key, value) in input.properties {
2362            builder = builder.property(key, value);
2363        }
2364
2365        for (key, value) in metadata {
2366            builder = builder.metadata(key, value);
2367        }
2368
2369        let id = builder.save()?;
2370        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2371        // sessions don't follow it until COMMIT.
2372        self.stamp_xmin_if_in_txn(&input.collection, id);
2373        refresh_context_index(&db, &input.collection, id)?;
2374        self.cdc_emit(
2375            crate::replication::cdc::ChangeOperation::Insert,
2376            &input.collection,
2377            id.raw(),
2378            "graph_edge",
2379        );
2380        Ok(CreateEntityOutput {
2381            id,
2382            entity: db.store().get(&input.collection, id),
2383        })
2384    }
2385}
2386
2387fn create_rows_batch_prevalidated_columnar_with_outputs(
2388    runtime: &RedDBRuntime,
2389    collection: String,
2390    column_names: std::sync::Arc<Vec<String>>,
2391    rows: Vec<Vec<crate::storage::schema::Value>>,
2392) -> RedDBResult<Vec<CreateEntityOutput>> {
2393    use crate::storage::{
2394        unified::{EntityData, EntityKind, RowData},
2395        EntityId, UnifiedEntity,
2396    };
2397    use std::sync::Arc;
2398
2399    if rows.is_empty() {
2400        return Ok(Vec::new());
2401    }
2402    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2403    runtime.check_batch_size(rows.len())?;
2404    runtime.check_db_size()?;
2405
2406    let db = runtime.db();
2407    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2408    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2409
2410    let store = db.store();
2411    let table_arc: Arc<str> = Arc::from(collection.as_str());
2412
2413    let indexed_cols = runtime
2414        .index_store_ref()
2415        .indexed_columns_set(collection.as_str());
2416    let has_secondary_indexes = !indexed_cols.is_empty();
2417    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2418        if has_secondary_indexes {
2419            Vec::with_capacity(rows.len())
2420        } else {
2421            Vec::new()
2422        };
2423
2424    let entities: Vec<UnifiedEntity> = rows
2425        .into_iter()
2426        .map(|values| {
2427            if has_secondary_indexes {
2428                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2429                    Vec::with_capacity(indexed_cols.len());
2430                for (name, val) in column_names.iter().zip(values.iter()) {
2431                    if indexed_cols.contains(name) {
2432                        snap.push((name.clone(), val.clone()));
2433                    }
2434                }
2435                field_snapshots.push(snap);
2436            }
2437            let mut row = RowData::new(values);
2438            row.schema = Some(Arc::clone(&column_names));
2439            UnifiedEntity::new(
2440                EntityId::new(0),
2441                EntityKind::TableRow {
2442                    table: Arc::clone(&table_arc),
2443                    row_id: 0,
2444                },
2445                EntityData::Row(row),
2446            )
2447        })
2448        .collect();
2449
2450    let ids = store
2451        .bulk_insert(&collection, entities)
2452        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2453
2454    if has_secondary_indexes {
2455        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2456            .iter()
2457            .zip(field_snapshots)
2458            .map(|(id, fields)| (*id, fields))
2459            .collect();
2460        runtime
2461            .index_store_ref()
2462            .index_entity_insert_batch(&collection, &index_rows)
2463            .map_err(crate::RedDBError::Internal)?;
2464    }
2465
2466    runtime.invalidate_result_cache();
2467    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2468
2469    Ok(ids
2470        .into_iter()
2471        .map(|id| CreateEntityOutput { id, entity: None })
2472        .collect())
2473}
2474
2475impl RuntimeEntityPort for RedDBRuntime {
2476    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2477        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2478        let db = self.db();
2479        let CreateRowInput {
2480            collection,
2481            fields,
2482            metadata: input_metadata,
2483            node_links,
2484            vector_links,
2485        } = input;
2486        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2487        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2488        let mut metadata = input_metadata;
2489        apply_collection_default_ttl(&db, &collection, &mut metadata);
2490        let fields = contract.normalize_insert_fields(fields)?;
2491        contract.enforce_row_uniqueness(&fields, None)?;
2492        // Route through MutationEngine for unified hot path.
2493        let engine = self.mutation_engine();
2494        let result = engine.apply(
2495            collection.clone(),
2496            vec![crate::runtime::mutation::MutationRow {
2497                fields,
2498                metadata,
2499                node_links,
2500                vector_links,
2501            }],
2502        )?;
2503        let id = result.ids[0];
2504        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2505        // that also takes a write lock on the entity cache. We know
2506        // the collection — hit the manager directly. Cuts
2507        // create_row() p50 roughly in half on the hot path.
2508        Ok(CreateEntityOutput {
2509            id,
2510            entity: db.store().get(&collection, id),
2511        })
2512    }
2513
2514    fn create_rows_batch(
2515        &self,
2516        input: CreateRowsBatchInput,
2517    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2518        if input.rows.is_empty() {
2519            return Ok(Vec::new());
2520        }
2521        self.check_batch_size(input.rows.len())?;
2522        self.check_db_size()?;
2523
2524        let db = self.db();
2525        let collection = input.collection;
2526        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2527        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2528
2529        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2530        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2531        for row in input.rows {
2532            if row.collection != collection {
2533                return Err(crate::RedDBError::Query(format!(
2534                    "batch row collection mismatch: expected '{}', got '{}'",
2535                    collection, row.collection
2536                )));
2537            }
2538
2539            let mut metadata = row.metadata;
2540            apply_collection_default_ttl(&db, &collection, &mut metadata);
2541            let fields = contract.normalize_insert_fields(row.fields)?;
2542            contract.enforce_row_uniqueness(&fields, None)?;
2543            uniqueness_rows.push(fields.clone());
2544            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2545        }
2546
2547        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2548
2549        // Route through MutationEngine: single bulk_insert + one CDC batch
2550        // instead of N separate cdc_emit() calls (each acquires a write lock).
2551        let engine = {
2552            let e = self.mutation_engine();
2553            if input.suppress_events {
2554                e.with_suppress_events()
2555            } else {
2556                e
2557            }
2558        };
2559        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2560            .into_iter()
2561            .map(|(fields, metadata, node_links, vector_links)| {
2562                crate::runtime::mutation::MutationRow {
2563                    fields,
2564                    metadata,
2565                    node_links,
2566                    vector_links,
2567                }
2568            })
2569            .collect();
2570
2571        let result = engine
2572            .apply(collection.clone(), mutation_rows)
2573            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2574
2575        let store = db.store();
2576        Ok(result
2577            .ids
2578            .into_iter()
2579            .map(|id| CreateEntityOutput {
2580                id,
2581                entity: store.get(&collection, id),
2582            })
2583            .collect())
2584    }
2585
2586    fn create_rows_batch_prevalidated_columnar(
2587        &self,
2588        collection: String,
2589        column_names: std::sync::Arc<Vec<String>>,
2590        rows: Vec<Vec<crate::storage::schema::Value>>,
2591    ) -> RedDBResult<usize> {
2592        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2593            .map(|outputs| outputs.len())
2594    }
2595
2596    fn create_rows_batch_columnar(
2597        &self,
2598        collection: String,
2599        column_names: std::sync::Arc<Vec<String>>,
2600        rows: Vec<Vec<crate::storage::schema::Value>>,
2601    ) -> RedDBResult<usize> {
2602        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2603            .map(|outputs| outputs.len())
2604    }
2605
2606    fn create_rows_batch_columnar_with_outputs(
2607        &self,
2608        collection: String,
2609        column_names: std::sync::Arc<Vec<String>>,
2610        rows: Vec<Vec<crate::storage::schema::Value>>,
2611    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2612        if rows.is_empty() {
2613            return Ok(Vec::new());
2614        }
2615        self.check_batch_size(rows.len())?;
2616        self.check_db_size()?;
2617
2618        let db = self.db();
2619        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2620        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2621
2622        // Fast path: when the collection carries no contract (or the
2623        // contract has no declared columns) `normalize_insert_fields`
2624        // is a no-op and `enforce_row_uniqueness` finds no unique
2625        // columns to check. Skip the per-row `(String, Value)` tuple
2626        // materialisation entirely and route through the prevalidated
2627        // columnar kernel — same write, CDC, and index path, just
2628        // without the wasted (String, Value) clones. This is the
2629        // bench `bench_users` shape (no contract declared by the
2630        // adapter's `setup_schema`).
2631        let needs_normalisation = match db.collection_contract(&collection) {
2632            Some(c) => {
2633                c.declared_model == crate::catalog::CollectionModel::Table
2634                    && (!c.declared_columns.is_empty()
2635                        || c.table_def
2636                            .as_ref()
2637                            .map(|t| !t.columns.is_empty())
2638                            .unwrap_or(false))
2639            }
2640            None => false,
2641        };
2642        if !needs_normalisation {
2643            return create_rows_batch_prevalidated_columnar_with_outputs(
2644                self,
2645                collection,
2646                column_names,
2647                rows,
2648            );
2649        }
2650
2651        // Slow path: contract requires per-row normalisation /
2652        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2653        // the columnar layout (this still pays N×ncols `String::clone`
2654        // — but it's deferred out of the wire decoder hot loop and
2655        // happens behind a runtime API, not the per-row decode loop)
2656        // and fall through to the existing `create_rows_batch` path.
2657        let ncols = column_names.len();
2658        let tuple_rows: Vec<CreateRowInput> = rows
2659            .into_iter()
2660            .map(|values| {
2661                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2662                    Vec::with_capacity(ncols);
2663                for (name, value) in column_names.iter().zip(values) {
2664                    fields.push((name.clone(), value));
2665                }
2666                CreateRowInput {
2667                    collection: collection.clone(),
2668                    fields,
2669                    metadata: Vec::new(),
2670                    node_links: Vec::new(),
2671                    vector_links: Vec::new(),
2672                }
2673            })
2674            .collect();
2675        self.create_rows_batch(CreateRowsBatchInput {
2676            collection,
2677            rows: tuple_rows,
2678            suppress_events: false,
2679        })
2680    }
2681
2682    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2683        if input.rows.is_empty() {
2684            return Ok(0);
2685        }
2686        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2687        self.check_batch_size(input.rows.len())?;
2688        self.check_db_size()?;
2689
2690        let db = self.db();
2691        let collection = input.collection;
2692        // Still verify the collection's declared model before we blast
2693        // rows at it — this one-off check is O(1), independent of
2694        // ncols, and catches schema-kind mismatches that the client
2695        // can't always see.
2696        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2697        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2698
2699        // Hoist the per-collection default TTL lookup out of the
2700        // per-row loop — it only depends on the collection, not on
2701        // individual rows, and the old path did one HashMap read per
2702        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2703        // apply to any row whose metadata doesn't already carry a TTL.
2704        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2705
2706        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2707            Vec::with_capacity(input.rows.len());
2708        let mut mutation_rows = mutation_rows;
2709        for row in input.rows {
2710            if row.collection != collection {
2711                return Err(crate::RedDBError::Query(format!(
2712                    "batch row collection mismatch: expected '{}', got '{}'",
2713                    collection, row.collection
2714                )));
2715            }
2716            let mut metadata = row.metadata;
2717            if let Some(ttl) = default_ttl_ms {
2718                if !has_internal_ttl_metadata(&metadata) {
2719                    metadata.push((
2720                        "_ttl_ms".to_string(),
2721                        if ttl <= i64::MAX as u64 {
2722                            MetadataValue::Int(ttl as i64)
2723                        } else {
2724                            MetadataValue::Timestamp(ttl)
2725                        },
2726                    ));
2727                }
2728            }
2729            mutation_rows.push(crate::runtime::mutation::MutationRow {
2730                fields: row.fields,
2731                metadata,
2732                node_links: row.node_links,
2733                vector_links: row.vector_links,
2734            });
2735        }
2736
2737        let engine = self.mutation_engine();
2738        let result = engine
2739            .apply(collection, mutation_rows)
2740            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2741        Ok(result.ids.len())
2742    }
2743
2744    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2745        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2746        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2747            input.properties.iter().map(|(key, _)| key.as_str()),
2748            &format!("node '{}'", input.collection),
2749        )?;
2750        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2751        self.create_node_unchecked(input)
2752    }
2753
2754    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2755        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2756        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2757            input.properties.iter().map(|(key, _)| key.as_str()),
2758            &format!("edge '{}'", input.collection),
2759        )?;
2760        ensure_non_tree_structural_edge_label(&input.label)?;
2761        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2762        self.create_edge_unchecked(input)
2763    }
2764
2765    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2766        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2767        let db = self.db();
2768        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2769        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2770        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2771        let mut metadata = input.metadata;
2772        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2773        let mut builder = db.vector(&input.collection).dense(input.dense);
2774
2775        if let Some(content) = input.content {
2776            builder = builder.content(content);
2777        }
2778
2779        for (key, value) in metadata {
2780            builder = builder.metadata(key, value);
2781        }
2782
2783        if let Some(link_row) = input.link_row {
2784            builder = builder.link_to_table(link_row);
2785        }
2786
2787        if let Some(link_node) = input.link_node {
2788            builder = builder.link_to_node(link_node);
2789        }
2790
2791        let id = builder.save()?;
2792        let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2793            Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2794            _ => None,
2795        };
2796        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2797        // concurrent ANN scans hide it until the transaction commits.
2798        self.stamp_xmin_if_in_txn(&input.collection, id);
2799        refresh_context_index(&db, &input.collection, id)?;
2800        // Issue #693 — vector.turbo write path. Order matters and
2801        // matches the write sequence the next slice (#673) will
2802        // attach kill-points to: WAL append → in-memory index update
2803        // → TurboExtent append. The legacy `vector` path is the
2804        // `None` branch and is untouched.
2805        if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2806            // Lazy populate so an INSERT after restart sees the
2807            // pre-restart vectors in the index before appending its
2808            // own.
2809            state.ensure_populated(&db.store(), &input.collection);
2810            // Issue #694 — named crash boundaries for #673.
2811            use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2812            fire(InjectionPoint::BeforeWalFsync);
2813            let _ = db
2814                .store()
2815                .append_vector_insert_record(&input.collection, id.raw(), &dense);
2816            fire(InjectionPoint::BeforeIndexCommit);
2817            let mut index = state.index.lock();
2818            index.insert(id, dense.clone());
2819            // Mirror the encoded codes into the per-collection
2820            // TurboExtent when one is available. The bytes are the
2821            // 4-bit-packed code groups (`n_byte_groups`) plus the
2822            // little-endian L2 norm — same shape `BlockedCodeStorage`
2823            // appends in-memory. Failures are non-fatal in this
2824            // slice; durability of the extent is owned by #673.
2825            fire(InjectionPoint::BeforeExtentFsync);
2826            if let Some(extent) = state.extent.lock().as_mut() {
2827                let packed = index.encode_for_extent(&dense);
2828                let _ = extent.append(&packed);
2829            }
2830        }
2831        self.cdc_emit(
2832            crate::replication::cdc::ChangeOperation::Insert,
2833            &input.collection,
2834            id.raw(),
2835            "vector",
2836        );
2837        Ok(CreateEntityOutput {
2838            id,
2839            entity: db.store().get(&input.collection, id),
2840        })
2841    }
2842
2843    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2844        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2845        let db = self.db();
2846        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2847        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2848
2849        if let JsonValue::Object(ref map) = input.body {
2850            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2851                map.keys().map(String::as_str),
2852                &format!("document '{}'", input.collection),
2853            )?;
2854        }
2855
2856        // Serialize the full body as Value::Json for the "body" field
2857        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2858            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2859        })?;
2860        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2861            "body".to_string(),
2862            crate::storage::schema::Value::Json(body_bytes),
2863        )];
2864
2865        // Flatten top-level keys from the body into named fields for filtering
2866        if let JsonValue::Object(ref map) = input.body {
2867            for (key, value) in map {
2868                let storage_value = json_to_storage_value(value)?;
2869                fields.push((key.clone(), storage_value));
2870            }
2871        }
2872
2873        let mut metadata = input.metadata;
2874        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2875        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2876            .iter()
2877            .map(|(key, value)| (key.as_str(), value.clone()))
2878            .collect();
2879        let mut builder = db.row(&input.collection, columns);
2880
2881        for (key, value) in metadata {
2882            builder = builder.metadata(key, value);
2883        }
2884
2885        for node in input.node_links {
2886            builder = builder.link_to_node(node);
2887        }
2888
2889        for vector in input.vector_links {
2890            builder = builder.link_to_vector(vector);
2891        }
2892
2893        let id = builder.save()?;
2894        // Phase 1.1 MVCC universal: stamp xmin on the document.
2895        self.stamp_xmin_if_in_txn(&input.collection, id);
2896        refresh_context_index(&db, &input.collection, id)?;
2897        self.cdc_emit(
2898            crate::replication::cdc::ChangeOperation::Insert,
2899            &input.collection,
2900            id.raw(),
2901            "document",
2902        );
2903        Ok(CreateEntityOutput {
2904            id,
2905            entity: db.store().get(&input.collection, id),
2906        })
2907    }
2908
2909    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2910        let db = self.db();
2911        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2912        let declared_model = db
2913            .collection_contract(&input.collection)
2914            .map(|contract| contract.declared_model);
2915        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2916            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2917            self.seal_vault_value(&input.collection, input.value)?
2918        } else {
2919            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2920            input.value
2921        };
2922        let fields = vec![
2923            (
2924                "key".to_string(),
2925                crate::storage::schema::Value::text(input.key),
2926            ),
2927            ("value".to_string(), value),
2928        ];
2929        let collection = input.collection;
2930        let result = self.mutation_engine().apply(
2931            collection.clone(),
2932            vec![crate::runtime::mutation::MutationRow {
2933                fields,
2934                metadata: input.metadata,
2935                node_links: Vec::new(),
2936                vector_links: Vec::new(),
2937            }],
2938        )?;
2939        let id = result.ids[0];
2940        Ok(CreateEntityOutput {
2941            id,
2942            entity: db.store().get(&collection, id),
2943        })
2944    }
2945
2946    fn create_timeseries_point(
2947        &self,
2948        input: CreateTimeSeriesPointInput,
2949    ) -> RedDBResult<CreateEntityOutput> {
2950        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2951        let db = self.db();
2952        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2953        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2954
2955        let mut fields = vec![
2956            (
2957                "metric".to_string(),
2958                crate::storage::schema::Value::text(input.metric),
2959            ),
2960            (
2961                "value".to_string(),
2962                crate::storage::schema::Value::Float(input.value),
2963            ),
2964        ];
2965
2966        if let Some(timestamp_ns) = input.timestamp_ns {
2967            fields.push((
2968                "timestamp".to_string(),
2969                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2970            ));
2971        }
2972
2973        if !input.tags.is_empty() {
2974            let tags_json = JsonValue::Object(
2975                input
2976                    .tags
2977                    .into_iter()
2978                    .map(|(key, value)| (key, JsonValue::String(value)))
2979                    .collect(),
2980            );
2981            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2982                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2983            })?;
2984            fields.push((
2985                "tags".to_string(),
2986                crate::storage::schema::Value::Json(tags_bytes),
2987            ));
2988        }
2989
2990        let collection = input.collection;
2991        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2992        // Phase 1.1 MVCC universal: stamp xmin on the point so
2993        // concurrent range scans hide it until COMMIT.
2994        self.stamp_xmin_if_in_txn(&collection, id);
2995        refresh_context_index(&db, &collection, id)?;
2996
2997        Ok(CreateEntityOutput {
2998            id,
2999            entity: db.store().get(&collection, id),
3000        })
3001    }
3002
3003    fn get_kv(
3004        &self,
3005        collection: &str,
3006        key: &str,
3007    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
3008        let db = self.db();
3009        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
3010        let store = db.store();
3011        let Some(manager) = store.get_collection(collection) else {
3012            return Ok(None);
3013        };
3014        let entities = manager.query_all(|_| true);
3015        for entity in entities {
3016            if let crate::storage::EntityData::Row(ref row) = entity.data {
3017                if let Some(ref named) = row.named {
3018                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
3019                        if &**k == key {
3020                            let value = named
3021                                .get("value")
3022                                .cloned()
3023                                .unwrap_or(crate::storage::schema::Value::Null);
3024                            return Ok(Some((value, entity.id)));
3025                        }
3026                    }
3027                }
3028            }
3029        }
3030        Ok(None)
3031    }
3032
3033    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
3034        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3035        let found = self.get_kv(collection, key)?;
3036        if let Some((_, id)) = found {
3037            let db = self.db();
3038            let store = db.store();
3039            let deleted = store
3040                .delete(collection, id)
3041                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3042            if deleted {
3043                store.context_index().remove_entity(id);
3044            }
3045            Ok(deleted)
3046        } else {
3047            Ok(false)
3048        }
3049    }
3050
3051    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
3052        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3053        let PatchEntityInput {
3054            collection,
3055            id,
3056            payload,
3057            operations,
3058        } = input;
3059        let db = self.db();
3060        let store = db.store();
3061        let Some(manager) = store.get_collection(&collection) else {
3062            return Err(crate::RedDBError::NotFound(format!(
3063                "collection not found: {collection}"
3064            )));
3065        };
3066        let Some(entity) = manager.get(id) else {
3067            return Err(crate::RedDBError::NotFound(format!(
3068                "entity not found: {}",
3069                id.raw()
3070            )));
3071        };
3072        self.apply_loaded_patch_entity(collection, entity, payload, operations)
3073    }
3074
3075    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3076        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3077        let store = self.db().store();
3078        // Snapshot row fields before delete so we can mirror the removal
3079        // into every secondary index. The fetch is best-effort: if the
3080        // entity is already gone, the delete below is a no-op anyway.
3081        let pre_delete_fields = store
3082            .get(&input.collection, input.id)
3083            .as_ref()
3084            .map(entity_row_fields_snapshot)
3085            .unwrap_or_default();
3086        // Store delete first (source of truth). Crash between here and index removal
3087        // leaves the entity invisible to most queries but recoverable; the reverse
3088        // (remove index first, then crash) leaves the entity permanently orphaned.
3089        let deleted = store
3090            .delete(&input.collection, input.id)
3091            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3092        if deleted {
3093            store.context_index().remove_entity(input.id);
3094            // Secondary index maintenance — surface only registry-shape
3095            // errors; missing-index removals are tolerated inside the call.
3096            if !pre_delete_fields.is_empty() {
3097                self.index_store_ref()
3098                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3099                    .map_err(crate::RedDBError::Internal)?;
3100            }
3101            self.cdc_emit(
3102                crate::replication::cdc::ChangeOperation::Delete,
3103                &input.collection,
3104                input.id.raw(),
3105                "entity",
3106            );
3107        }
3108        Ok(DeleteEntityOutput {
3109            deleted,
3110            id: input.id,
3111        })
3112    }
3113}