Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        vector_dimension: None,
112        vector_metric: None,
113        context_index_fields: Vec::new(),
114        declared_columns: Vec::new(),
115        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117        timestamps_enabled: false,
118        context_index_enabled: false,
119        metrics_raw_retention_ms: None,
120        metrics_rollup_policies: Vec::new(),
121        metrics_tenant_identity: None,
122        metrics_namespace: None,
123        // Implicit contracts are created on first write — mutability
124        // is the default until the operator runs explicit DDL.
125        append_only: false,
126        subscriptions: Vec::new(),
127    })
128    .map(|_| ())
129    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
130}
131
132fn implicit_contract_unix_ms() -> u128 {
133    std::time::SystemTime::now()
134        .duration_since(std::time::UNIX_EPOCH)
135        .unwrap_or_default()
136        .as_millis()
137}
138
139fn collection_model_allows(
140    declared_model: crate::catalog::CollectionModel,
141    requested_model: crate::catalog::CollectionModel,
142) -> bool {
143    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
144}
145
146fn ensure_vector_dimension_contract(
147    db: &crate::storage::unified::devx::RedDB,
148    collection: &str,
149    actual_dimension: usize,
150) -> RedDBResult<()> {
151    let Some(expected_dimension) = db
152        .collection_contract(collection)
153        .and_then(|contract| contract.vector_dimension)
154    else {
155        return Ok(());
156    };
157    if expected_dimension == actual_dimension {
158        return Ok(());
159    }
160    Err(crate::RedDBError::Query(format!(
161        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
162    )))
163}
164
165fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
166    match model {
167        crate::catalog::CollectionModel::Table => "table",
168        crate::catalog::CollectionModel::Document => "document",
169        crate::catalog::CollectionModel::Graph => "graph",
170        crate::catalog::CollectionModel::Vector => "vector",
171        crate::catalog::CollectionModel::Hll => "hll",
172        crate::catalog::CollectionModel::Sketch => "sketch",
173        crate::catalog::CollectionModel::Filter => "filter",
174        crate::catalog::CollectionModel::Kv => "kv",
175        crate::catalog::CollectionModel::Config => "config",
176        crate::catalog::CollectionModel::Vault => "vault",
177        crate::catalog::CollectionModel::Mixed => "mixed",
178        crate::catalog::CollectionModel::TimeSeries => "timeseries",
179        crate::catalog::CollectionModel::Queue => "queue",
180        crate::catalog::CollectionModel::Metrics => "metrics",
181    }
182}
183
184#[derive(Clone)]
185struct UniquenessRule {
186    name: String,
187    columns: Vec<String>,
188    primary_key: bool,
189}
190
191#[derive(Copy, Clone, PartialEq, Eq)]
192enum NormalizeMode {
193    /// First write for this row. Timestamps auto-filled from now on
194    /// both `created_at` and `updated_at`; user attempts to set
195    /// either column are rejected.
196    Insert,
197    /// Update/patch path. `created_at` is preserved from the existing
198    /// row (immutable after insert); `updated_at` is bumped to now.
199    /// User attempts to set either via the patch are rejected.
200    Update,
201}
202
203mod collection_contract_enforcement {
204    use super::*;
205
206    pub(super) struct CollectionContractWriteEnforcer<'a> {
207        db: &'a crate::storage::unified::devx::RedDB,
208        collection: &'a str,
209    }
210
211    impl<'a> CollectionContractWriteEnforcer<'a> {
212        pub(super) fn new(
213            db: &'a crate::storage::unified::devx::RedDB,
214            collection: &'a str,
215        ) -> Self {
216            Self { db, collection }
217        }
218
219        pub(super) fn ensure_model(
220            &self,
221            requested_model: crate::catalog::CollectionModel,
222        ) -> RedDBResult<()> {
223            ensure_collection_model_contract(self.db, self.collection, requested_model)
224        }
225
226        pub(super) fn normalize_insert_fields(
227            &self,
228            fields: Vec<(String, Value)>,
229        ) -> RedDBResult<Vec<(String, Value)>> {
230            normalize_row_fields_for_contract_with_mode(
231                self.db,
232                self.collection,
233                fields,
234                NormalizeMode::Insert,
235            )
236        }
237
238        pub(super) fn normalize_update_fields(
239            &self,
240            fields: Vec<(String, Value)>,
241        ) -> RedDBResult<Vec<(String, Value)>> {
242            normalize_row_fields_for_contract_with_mode(
243                self.db,
244                self.collection,
245                fields,
246                NormalizeMode::Update,
247            )
248        }
249
250        pub(super) fn enforce_row_uniqueness(
251            &self,
252            fields: &[(String, Value)],
253            exclude_id: Option<crate::storage::EntityId>,
254        ) -> RedDBResult<()> {
255            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
256        }
257
258        pub(super) fn enforce_batch_uniqueness(
259            &self,
260            rows: &[Vec<(String, Value)>],
261        ) -> RedDBResult<()> {
262            enforce_row_batch_uniqueness(self.db, self.collection, rows)
263        }
264
265        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
266            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
267        }
268    }
269}
270
271use collection_contract_enforcement::CollectionContractWriteEnforcer;
272
273fn normalize_row_fields_for_contract_with_mode(
274    db: &crate::storage::unified::devx::RedDB,
275    collection: &str,
276    fields: Vec<(String, Value)>,
277    mode: NormalizeMode,
278) -> RedDBResult<Vec<(String, Value)>> {
279    let Some(contract) = db.collection_contract(collection) else {
280        return Ok(fields);
281    };
282
283    if contract.declared_model != crate::catalog::CollectionModel::Table
284        || (contract.declared_columns.is_empty()
285            && contract
286                .table_def
287                .as_ref()
288                .map(|table| table.columns.is_empty())
289                .unwrap_or(true))
290    {
291        return Ok(fields);
292    }
293
294    // Capture the pre-normalize value of created_at (if present) so
295    // Update mode can preserve it. Also capture updated_at to detect
296    // user attempts to set it via the patch payload.
297    //
298    // Heuristic for Update mode: if fields ALREADY contains a
299    // `created_at` whose value matches the row's on-disk entity, the
300    // caller is the patch pipeline carrying forward an auto-populated
301    // column — not a user mutation. Allow pass-through in that case,
302    // then restore the original value at the end.
303    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
304        fields
305            .iter()
306            .find(|(n, _)| n == "created_at")
307            .map(|(_, v)| v.clone())
308    } else {
309        None
310    };
311
312    // Reject user attempts to set runtime-managed timestamp columns.
313    // On Insert we reject any mention; on Update we only reject when
314    // the patch pipeline handed us a NEW value (not the one we
315    // auto-populated during the last insert).
316    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
317        for (name, _) in &fields {
318            if name == "created_at" || name == "updated_at" {
319                return Err(crate::RedDBError::Query(format!(
320                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
321                    collection, name
322                )));
323            }
324        }
325    }
326
327    let mut provided = std::collections::BTreeMap::new();
328    for (name, value) in &fields {
329        provided.insert(name.clone(), value.clone());
330    }
331
332    let resolved_columns = resolved_contract_columns(&contract)?;
333    let declared_names: std::collections::BTreeSet<String> = resolved_columns
334        .iter()
335        .map(|column| column.name.clone())
336        .collect();
337    let unknown_fields: Vec<String> = fields
338        .iter()
339        .filter(|(name, _)| !declared_names.contains(name))
340        .map(|(name, _)| name.clone())
341        .collect();
342    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
343        && !unknown_fields.is_empty()
344    {
345        return Err(crate::RedDBError::Query(format!(
346            "collection '{}' is strict and does not allow undeclared fields: {}",
347            collection,
348            unknown_fields.join(", ")
349        )));
350    }
351    let mut normalized = Vec::new();
352    let now_ms = current_unix_ms_u64();
353
354    for column in &resolved_columns {
355        match provided.remove(&column.name) {
356            Some(value) => {
357                // Runtime-managed columns on Update: always overwrite
358                // with the runtime's own value (preserved created_at
359                // or fresh updated_at). User mutations are silently
360                // discarded because we reject them earlier.
361                if contract.timestamps_enabled && mode == NormalizeMode::Update {
362                    match column.name.as_str() {
363                        "created_at" => {
364                            normalized.push((
365                                column.name.clone(),
366                                existing_created_at
367                                    .clone()
368                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
369                            ));
370                            continue;
371                        }
372                        "updated_at" => {
373                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
374                            continue;
375                        }
376                        _ => {}
377                    }
378                }
379                normalized.push((
380                    column.name.clone(),
381                    normalize_contract_value(collection, column, value)?,
382                ));
383            }
384            None => {
385                // Runtime-managed timestamp columns: auto-fill with now
386                // when the contract opted in. Both get the same value on
387                // first insert so callers can order by either.
388                if contract.timestamps_enabled
389                    && (column.name == "created_at" || column.name == "updated_at")
390                {
391                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
392                    continue;
393                }
394                if let Some(default) = &column.default {
395                    normalized.push((
396                        column.name.clone(),
397                        coerce_contract_literal(collection, &column.name, column, default)?,
398                    ));
399                } else if column.not_null {
400                    return Err(crate::RedDBError::Query(format!(
401                        "missing required column '{}' for collection '{}'",
402                        column.name, collection
403                    )));
404                }
405            }
406        }
407    }
408
409    for (name, value) in fields {
410        if !declared_names.contains(&name) {
411            normalized.push((name, value));
412        }
413    }
414
415    Ok(normalized)
416}
417
418fn current_unix_ms_u64() -> u64 {
419    std::time::SystemTime::now()
420        .duration_since(std::time::UNIX_EPOCH)
421        .map(|d| d.as_millis() as u64)
422        .unwrap_or(0)
423}
424
425fn enforce_row_uniqueness(
426    db: &crate::storage::unified::devx::RedDB,
427    collection: &str,
428    fields: &[(String, Value)],
429    exclude_id: Option<crate::storage::EntityId>,
430) -> RedDBResult<()> {
431    let Some(contract) = db.collection_contract(collection) else {
432        return Ok(());
433    };
434    if contract.declared_model != crate::catalog::CollectionModel::Table
435        && contract.declared_model != crate::catalog::CollectionModel::Mixed
436    {
437        return Ok(());
438    }
439
440    let rules = resolved_uniqueness_rules(&contract);
441    if rules.is_empty() {
442        return Ok(());
443    }
444
445    let store = db.store();
446    let Some(manager) = store.get_collection(collection) else {
447        return Ok(());
448    };
449
450    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
451
452    for rule in &rules {
453        let mut expected_signatures = Vec::new();
454        let mut skip_rule = false;
455
456        for column in &rule.columns {
457            match input_fields.get(column) {
458                Some(Value::Null) | None if rule.primary_key => {
459                    return Err(crate::RedDBError::Query(format!(
460                        "primary key '{}' in collection '{}' requires non-null column '{}'",
461                        rule.name, collection, column
462                    )))
463                }
464                Some(Value::Null) | None => {
465                    skip_rule = true;
466                    break;
467                }
468                Some(value) => {
469                    expected_signatures.push((column.clone(), value_signature(value)));
470                }
471            }
472        }
473
474        if skip_rule {
475            continue;
476        }
477
478        for entity in manager.query_all(|_| true) {
479            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
480                continue;
481            }
482            let Some(existing_fields) = row_fields_from_entity(&entity) else {
483                continue;
484            };
485
486            let duplicate = expected_signatures.iter().all(|(column, expected)| {
487                existing_fields
488                    .get(column)
489                    .map(|value| value_signature(value) == *expected)
490                    .unwrap_or(false)
491            });
492
493            if duplicate {
494                let qualifier = if rule.primary_key {
495                    "primary key"
496                } else {
497                    "unique constraint"
498                };
499                return Err(crate::RedDBError::Query(format!(
500                    "{} '{}' violated on collection '{}' for columns [{}]",
501                    qualifier,
502                    rule.name,
503                    collection,
504                    rule.columns.join(", ")
505                )));
506            }
507        }
508    }
509
510    Ok(())
511}
512
513fn enforce_row_batch_uniqueness(
514    db: &crate::storage::unified::devx::RedDB,
515    collection: &str,
516    rows: &[Vec<(String, Value)>],
517) -> RedDBResult<()> {
518    let Some(contract) = db.collection_contract(collection) else {
519        return Ok(());
520    };
521    if contract.declared_model != crate::catalog::CollectionModel::Table
522        && contract.declared_model != crate::catalog::CollectionModel::Mixed
523    {
524        return Ok(());
525    }
526
527    let rules = resolved_uniqueness_rules(&contract);
528    if rules.is_empty() {
529        return Ok(());
530    }
531
532    for rule in &rules {
533        let mut seen = std::collections::HashMap::<String, usize>::new();
534        for (row_index, fields) in rows.iter().enumerate() {
535            let input_fields: std::collections::BTreeMap<String, Value> =
536                fields.iter().cloned().collect();
537            let mut signatures = Vec::new();
538            let mut skip_rule = false;
539
540            for column in &rule.columns {
541                match input_fields.get(column) {
542                    Some(Value::Null) | None if rule.primary_key => {
543                        return Err(crate::RedDBError::Query(format!(
544                            "primary key '{}' in collection '{}' requires non-null column '{}'",
545                            rule.name, collection, column
546                        )))
547                    }
548                    Some(Value::Null) | None => {
549                        skip_rule = true;
550                        break;
551                    }
552                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
553                }
554            }
555
556            if skip_rule {
557                continue;
558            }
559
560            let signature = signatures.join("|");
561            if let Some(previous_index) = seen.insert(signature, row_index) {
562                return Err(crate::RedDBError::Query(format!(
563                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
564                    rule.name,
565                    collection,
566                    previous_index + 1,
567                    row_index + 1
568                )));
569            }
570        }
571    }
572
573    Ok(())
574}
575
576fn row_update_requires_uniqueness_check(
577    db: &crate::storage::unified::devx::RedDB,
578    collection: &str,
579    modified_columns: &[String],
580) -> bool {
581    if modified_columns.is_empty() {
582        return false;
583    }
584
585    let Some(contract) = db.collection_contract(collection) else {
586        return false;
587    };
588    if contract.declared_model != crate::catalog::CollectionModel::Table
589        && contract.declared_model != crate::catalog::CollectionModel::Mixed
590    {
591        return false;
592    }
593
594    let rules = resolved_uniqueness_rules(&contract);
595    if rules.is_empty() {
596        return false;
597    }
598
599    rules.iter().any(|rule| {
600        rule.columns.iter().any(|column| {
601            modified_columns
602                .iter()
603                .any(|modified| modified.eq_ignore_ascii_case(column))
604        })
605    })
606}
607
608pub(crate) fn build_row_update_contract_plan(
609    db: &crate::storage::unified::devx::RedDB,
610    collection: &str,
611) -> RedDBResult<Option<RowUpdateContractPlan>> {
612    let Some(contract) = db.collection_contract(collection) else {
613        return Ok(None);
614    };
615
616    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
617        && !(contract.declared_columns.is_empty()
618            && contract
619                .table_def
620                .as_ref()
621                .map(|table| table.columns.is_empty())
622                .unwrap_or(true))
623    {
624        resolved_contract_columns(&contract)?
625            .into_iter()
626            .map(|rule| {
627                (
628                    rule.name.clone(),
629                    RowUpdateColumnRule {
630                        name: rule.name,
631                        data_type: rule.data_type,
632                        data_type_name: rule.data_type_name,
633                        not_null: rule.not_null,
634                        enum_variants: rule.enum_variants,
635                    },
636                )
637            })
638            .collect()
639    } else {
640        HashMap::new()
641    };
642
643    let unique_columns = if matches!(
644        contract.declared_model,
645        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
646    ) {
647        resolved_uniqueness_rules(&contract)
648            .into_iter()
649            .flat_map(|rule| rule.columns.into_iter())
650            .map(|column| (column, ()))
651            .collect()
652    } else {
653        HashMap::new()
654    };
655
656    Ok(Some(RowUpdateContractPlan {
657        timestamps_enabled: contract.timestamps_enabled,
658        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
659        declared_rules,
660        unique_columns,
661    }))
662}
663
664pub(crate) fn normalize_row_update_assignment_with_plan(
665    collection: &str,
666    column: &str,
667    value: Value,
668    row_contract_plan: Option<&RowUpdateContractPlan>,
669) -> RedDBResult<Value> {
670    let Some(plan) = row_contract_plan else {
671        return Ok(value);
672    };
673
674    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
675        return Err(crate::RedDBError::Query(format!(
676            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
677            collection, column
678        )));
679    }
680
681    if let Some(rule) = plan.declared_rules.get(column) {
682        let rule = ResolvedColumnRule {
683            name: rule.name.clone(),
684            data_type: rule.data_type,
685            data_type_name: rule.data_type_name.clone(),
686            not_null: rule.not_null,
687            default: None,
688            enum_variants: rule.enum_variants.clone(),
689        };
690        normalize_contract_value(collection, &rule, value)
691    } else if plan.strict_schema {
692        Err(crate::RedDBError::Query(format!(
693            "collection '{}' is strict and does not allow undeclared fields: {}",
694            collection, column
695        )))
696    } else {
697        Ok(value)
698    }
699}
700
701pub(crate) fn normalize_row_update_value_for_rule(
702    collection: &str,
703    value: Value,
704    row_rule: Option<&RowUpdateColumnRule>,
705) -> RedDBResult<Value> {
706    let Some(rule) = row_rule else {
707        return Ok(value);
708    };
709
710    let rule = ResolvedColumnRule {
711        name: rule.name.clone(),
712        data_type: rule.data_type,
713        data_type_name: rule.data_type_name.clone(),
714        not_null: rule.not_null,
715        default: None,
716        enum_variants: rule.enum_variants.clone(),
717    };
718    normalize_contract_value(collection, &rule, value)
719}
720
721fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
722    if let Some(named) = row.named.as_mut() {
723        named.insert(name.to_string(), value);
724        return;
725    }
726
727    if let Some(schema) = row.schema.as_ref() {
728        if let Some(index) = schema.iter().position(|column| column == name) {
729            if let Some(slot) = row.columns.get_mut(index) {
730                *slot = value;
731                return;
732            }
733        }
734
735        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
736        for (column, current) in schema.iter().zip(row.columns.iter()) {
737            named.insert(column.clone(), current.clone());
738        }
739        named.insert(name.to_string(), value);
740        row.named = Some(named);
741        return;
742    }
743
744    let mut named = HashMap::with_capacity(1);
745    named.insert(name.to_string(), value);
746    row.named = Some(named);
747}
748
749fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
750    if let Some(named) = row.named.as_ref() {
751        named
752            .iter()
753            .map(|(key, value)| (key.clone(), value.clone()))
754            .collect()
755    } else if let Some(schema) = row.schema.as_ref() {
756        schema
757            .iter()
758            .cloned()
759            .zip(row.columns.iter().cloned())
760            .collect()
761    } else {
762        Vec::new()
763    }
764}
765
766fn apply_row_field_assignments_raw<I>(
767    row: &mut crate::storage::unified::entity::RowData,
768    field_assignments: I,
769) where
770    I: IntoIterator<Item = (String, Value)>,
771{
772    for (column, value) in field_assignments {
773        set_row_field(row, &column, value);
774    }
775}
776
777fn apply_row_field_assignments_incremental<I>(
778    collection: &str,
779    row: &mut crate::storage::unified::entity::RowData,
780    field_assignments: I,
781    row_contract_plan: Option<&RowUpdateContractPlan>,
782) -> RedDBResult<()>
783where
784    I: IntoIterator<Item = (String, Value)>,
785{
786    for (column, value) in field_assignments {
787        let value = normalize_row_update_assignment_with_plan(
788            collection,
789            &column,
790            value,
791            row_contract_plan,
792        )?;
793
794        set_row_field(row, &column, value);
795    }
796
797    Ok(())
798}
799
800fn resolved_uniqueness_rules(
801    contract: &crate::physical::CollectionContract,
802) -> Vec<UniquenessRule> {
803    let mut rules = Vec::new();
804
805    if let Some(table_def) = &contract.table_def {
806        if !table_def.primary_key.is_empty() {
807            rules.push(UniquenessRule {
808                name: "primary_key".to_string(),
809                columns: table_def.primary_key.clone(),
810                primary_key: true,
811            });
812        }
813
814        for constraint in &table_def.constraints {
815            if matches!(
816                constraint.constraint_type,
817                crate::storage::schema::ConstraintType::PrimaryKey
818            ) && !constraint.columns.is_empty()
819            {
820                rules.push(UniquenessRule {
821                    name: constraint.name.clone(),
822                    columns: constraint.columns.clone(),
823                    primary_key: true,
824                });
825            } else if matches!(
826                constraint.constraint_type,
827                crate::storage::schema::ConstraintType::Unique
828            ) && !constraint.columns.is_empty()
829            {
830                rules.push(UniquenessRule {
831                    name: constraint.name.clone(),
832                    columns: constraint.columns.clone(),
833                    primary_key: false,
834                });
835            }
836        }
837    } else {
838        for column in &contract.declared_columns {
839            if column.primary_key {
840                rules.push(UniquenessRule {
841                    name: format!("pk_{}", column.name),
842                    columns: vec![column.name.clone()],
843                    primary_key: true,
844                });
845            } else if column.unique {
846                rules.push(UniquenessRule {
847                    name: format!("uniq_{}", column.name),
848                    columns: vec![column.name.clone()],
849                    primary_key: false,
850                });
851            }
852        }
853    }
854
855    let mut dedup = std::collections::BTreeSet::new();
856    rules
857        .into_iter()
858        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
859        .collect()
860}
861
862fn row_fields_from_entity(
863    entity: &crate::storage::UnifiedEntity,
864) -> Option<std::collections::BTreeMap<String, Value>> {
865    match &entity.data {
866        crate::storage::EntityData::Row(row) => {
867            if let Some(named) = &row.named {
868                Some(
869                    named
870                        .iter()
871                        .map(|(key, value)| (key.clone(), value.clone()))
872                        .collect(),
873                )
874            } else {
875                row.schema.as_ref().map(|schema| {
876                    schema
877                        .iter()
878                        .cloned()
879                        .zip(row.columns.iter().cloned())
880                        .collect()
881                })
882            }
883        }
884        _ => None,
885    }
886}
887
888fn value_signature(value: &Value) -> String {
889    format!("{value:?}")
890}
891
892fn normalize_contract_value(
893    collection: &str,
894    column: &ResolvedColumnRule,
895    value: Value,
896) -> RedDBResult<Value> {
897    if matches!(value, Value::Null) {
898        if column.not_null {
899            return Err(crate::RedDBError::Query(format!(
900                "column '{}' in collection '{}' cannot be null",
901                column.name, collection
902            )));
903        }
904        return Ok(Value::Null);
905    }
906
907    let target = column.data_type;
908    if value_matches_declared_type(&value, target) {
909        return Ok(value);
910    }
911
912    let Some(raw) = value_to_coercion_input(&value) else {
913        return Err(crate::RedDBError::Query(format!(
914            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
915            column.name, collection, column.data_type_name
916        )));
917    };
918
919    coerce_contract_literal(collection, &column.name, column, &raw)
920}
921
922fn coerce_contract_literal(
923    collection: &str,
924    column_name: &str,
925    column: &ResolvedColumnRule,
926    raw: &str,
927) -> RedDBResult<Value> {
928    let target = column.data_type;
929    match target {
930        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
931        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
932        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
933            crate::RedDBError::Query(format!(
934                "failed to coerce column '{}' in collection '{}' to '{}': {}",
935                column_name, collection, column.data_type_name, err
936            ))
937        }),
938        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
939            crate::RedDBError::Query(format!(
940                "failed to coerce column '{}' in collection '{}' to '{}': {}",
941                column_name, collection, column.data_type_name, err
942            ))
943        }),
944        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
945            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
946            column_name, collection, column.data_type_name
947        ))),
948        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
949            |err| {
950                crate::RedDBError::Query(format!(
951                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
952                    column_name, collection, column.data_type_name, err
953                ))
954            },
955        ),
956    }
957}
958
959struct ResolvedColumnRule {
960    name: String,
961    data_type: DataType,
962    data_type_name: String,
963    not_null: bool,
964    default: Option<String>,
965    enum_variants: Vec<String>,
966}
967
968fn resolved_contract_columns(
969    contract: &crate::physical::CollectionContract,
970) -> RedDBResult<Vec<ResolvedColumnRule>> {
971    if let Some(table_def) = &contract.table_def {
972        return Ok(table_def
973            .columns
974            .iter()
975            .map(|column| ResolvedColumnRule {
976                name: column.name.clone(),
977                data_type: column.data_type,
978                data_type_name: data_type_name(column.data_type).to_string(),
979                not_null: !column.nullable,
980                default: column
981                    .default
982                    .as_ref()
983                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
984                enum_variants: column.enum_variants.clone(),
985            })
986            .collect());
987    }
988
989    contract
990        .declared_columns
991        .iter()
992        .map(|column| {
993            let data_type = column
994                .sql_type
995                .as_ref()
996                .map(crate::storage::query::resolve_sql_type_name)
997                .transpose()
998                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
999                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1000            Ok(ResolvedColumnRule {
1001                name: column.name.clone(),
1002                data_type,
1003                data_type_name: column.data_type.clone(),
1004                not_null: column.not_null,
1005                default: column.default.clone(),
1006                enum_variants: column.enum_variants.clone(),
1007            })
1008        })
1009        .collect()
1010}
1011
1012fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1013    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1014}
1015
1016fn data_type_name(data_type: DataType) -> &'static str {
1017    match data_type {
1018        DataType::Integer => "integer",
1019        DataType::UnsignedInteger => "unsigned_integer",
1020        DataType::Float => "float",
1021        DataType::Text => "text",
1022        DataType::Blob => "blob",
1023        DataType::Boolean => "boolean",
1024        DataType::Timestamp => "timestamp",
1025        DataType::Duration => "duration",
1026        DataType::IpAddr => "ipaddr",
1027        DataType::MacAddr => "macaddr",
1028        DataType::Vector => "vector",
1029        DataType::Nullable => "nullable",
1030        DataType::Unknown => "unknown",
1031        DataType::Json => "json",
1032        DataType::Uuid => "uuid",
1033        DataType::NodeRef => "noderef",
1034        DataType::EdgeRef => "edgeref",
1035        DataType::VectorRef => "vectorref",
1036        DataType::RowRef => "rowref",
1037        DataType::Color => "color",
1038        DataType::Email => "email",
1039        DataType::Url => "url",
1040        DataType::Phone => "phone",
1041        DataType::Semver => "semver",
1042        DataType::Cidr => "cidr",
1043        DataType::Date => "date",
1044        DataType::Time => "time",
1045        DataType::Decimal => "decimal",
1046        DataType::Enum => "enum",
1047        DataType::Array => "array",
1048        DataType::TimestampMs => "timestamp_ms",
1049        DataType::Ipv4 => "ipv4",
1050        DataType::Ipv6 => "ipv6",
1051        DataType::Subnet => "subnet",
1052        DataType::Port => "port",
1053        DataType::Latitude => "latitude",
1054        DataType::Longitude => "longitude",
1055        DataType::GeoPoint => "geopoint",
1056        DataType::Country2 => "country2",
1057        DataType::Country3 => "country3",
1058        DataType::Lang2 => "lang2",
1059        DataType::Lang5 => "lang5",
1060        DataType::Currency => "currency",
1061        DataType::AssetCode => "asset_code",
1062        DataType::Money => "money",
1063        DataType::ColorAlpha => "color_alpha",
1064        DataType::BigInt => "bigint",
1065        DataType::KeyRef => "keyref",
1066        DataType::DocRef => "docref",
1067        DataType::TableRef => "tableref",
1068        DataType::PageRef => "pageref",
1069        DataType::Secret => "secret",
1070        DataType::Password => "password",
1071        DataType::TextZstd => "text",
1072        DataType::BlobZstd => "blob",
1073    }
1074}
1075
1076fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1077    matches!(
1078        (value, target),
1079        (Value::Null, _)
1080            | (Value::Integer(_), DataType::Integer)
1081            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1082            | (Value::Float(_), DataType::Float)
1083            | (Value::Text(_), DataType::Text)
1084            | (Value::Blob(_), DataType::Blob)
1085            | (Value::Boolean(_), DataType::Boolean)
1086            | (Value::Timestamp(_), DataType::Timestamp)
1087            | (Value::Duration(_), DataType::Duration)
1088            | (Value::IpAddr(_), DataType::IpAddr)
1089            | (Value::MacAddr(_), DataType::MacAddr)
1090            | (Value::Vector(_), DataType::Vector)
1091            | (Value::Json(_), DataType::Json)
1092            | (Value::Uuid(_), DataType::Uuid)
1093            | (Value::NodeRef(_), DataType::NodeRef)
1094            | (Value::EdgeRef(_), DataType::EdgeRef)
1095            | (Value::VectorRef(_, _), DataType::VectorRef)
1096            | (Value::RowRef(_, _), DataType::RowRef)
1097            | (Value::Color(_), DataType::Color)
1098            | (Value::Email(_), DataType::Email)
1099            | (Value::Url(_), DataType::Url)
1100            | (Value::Phone(_), DataType::Phone)
1101            | (Value::Semver(_), DataType::Semver)
1102            | (Value::Cidr(_, _), DataType::Cidr)
1103            | (Value::Date(_), DataType::Date)
1104            | (Value::Time(_), DataType::Time)
1105            | (Value::Decimal(_), DataType::Decimal)
1106            | (Value::EnumValue(_), DataType::Enum)
1107            | (Value::Array(_), DataType::Array)
1108            | (Value::TimestampMs(_), DataType::TimestampMs)
1109            | (Value::Ipv4(_), DataType::Ipv4)
1110            | (Value::Ipv6(_), DataType::Ipv6)
1111            | (Value::Subnet(_, _), DataType::Subnet)
1112            | (Value::Port(_), DataType::Port)
1113            | (Value::Latitude(_), DataType::Latitude)
1114            | (Value::Longitude(_), DataType::Longitude)
1115            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1116            | (Value::Country2(_), DataType::Country2)
1117            | (Value::Country3(_), DataType::Country3)
1118            | (Value::Lang2(_), DataType::Lang2)
1119            | (Value::Lang5(_), DataType::Lang5)
1120            | (Value::Currency(_), DataType::Currency)
1121            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1122            | (Value::BigInt(_), DataType::BigInt)
1123            | (Value::KeyRef(_, _), DataType::KeyRef)
1124            | (Value::DocRef(_, _), DataType::DocRef)
1125            | (Value::TableRef(_), DataType::TableRef)
1126            | (Value::PageRef(_), DataType::PageRef)
1127            | (Value::Secret(_), DataType::Secret)
1128            | (Value::Password(_), DataType::Password)
1129    )
1130}
1131
1132fn value_to_coercion_input(value: &Value) -> Option<String> {
1133    match value {
1134        Value::Null => None,
1135        Value::Integer(value) => Some(value.to_string()),
1136        Value::UnsignedInteger(value) => Some(value.to_string()),
1137        Value::Float(value) => Some(value.to_string()),
1138        Value::Text(value) => Some(value.to_string()),
1139        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1140        Value::Boolean(value) => Some(value.to_string()),
1141        Value::Timestamp(value) => Some(value.to_string()),
1142        Value::Duration(value) => Some(value.to_string()),
1143        Value::IpAddr(value) => Some(value.to_string()),
1144        Value::MacAddr(value) => Some(format!(
1145            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1146            value[0], value[1], value[2], value[3], value[4], value[5]
1147        )),
1148        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1149        Value::Email(value) => Some(value.clone()),
1150        Value::Url(value) => Some(value.clone()),
1151        Value::Phone(value) => Some(value.to_string()),
1152        Value::Semver(value) => Some(format!(
1153            "{}.{}.{}",
1154            value / 1_000_000,
1155            (value / 1_000) % 1_000,
1156            value % 1_000
1157        )),
1158        Value::Date(value) => Some(value.to_string()),
1159        Value::Time(value) => Some(value.to_string()),
1160        Value::Decimal(value) => Some(value.to_string()),
1161        Value::TimestampMs(value) => Some(value.to_string()),
1162        Value::Ipv4(value) => Some(format!(
1163            "{}.{}.{}.{}",
1164            (value >> 24) & 0xFF,
1165            (value >> 16) & 0xFF,
1166            (value >> 8) & 0xFF,
1167            value & 0xFF
1168        )),
1169        Value::Port(value) => Some(value.to_string()),
1170        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1171        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1172        Value::GeoPoint(lat, lon) => Some(format!(
1173            "{},{}",
1174            *lat as f64 / 1_000_000.0,
1175            *lon as f64 / 1_000_000.0
1176        )),
1177        Value::BigInt(value) => Some(value.to_string()),
1178        Value::TableRef(value) => Some(value.clone()),
1179        Value::PageRef(value) => Some(value.to_string()),
1180        Value::Password(value) => Some(value.clone()),
1181        _ => None,
1182    }
1183}
1184
1185fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1186    if modified_columns.is_empty() {
1187        return modified_columns;
1188    }
1189
1190    let mut unique = Vec::with_capacity(modified_columns.len());
1191    for column in modified_columns.drain(..) {
1192        if !unique
1193            .iter()
1194            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1195        {
1196            unique.push(column);
1197        }
1198    }
1199    unique
1200}
1201
1202fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1203    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1204        return Err(crate::RedDBError::Query(
1205            "array positional document patch paths are unsupported; replace the array or full document body instead"
1206                .to_string(),
1207        ));
1208    }
1209    Ok(())
1210}
1211
1212fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1213    match fields.get("body") {
1214        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1215            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1216        }),
1217        Some(_) => Err(crate::RedDBError::Query(
1218            "document body field must contain JSON".to_string(),
1219        )),
1220        None => Ok(JsonValue::Object(Default::default())),
1221    }
1222}
1223
1224fn replace_document_row_body(
1225    fields: &mut HashMap<String, Value>,
1226    body: JsonValue,
1227    modified_columns: &mut Vec<String>,
1228) -> RedDBResult<()> {
1229    modified_columns.push("body".to_string());
1230
1231    let old_keys: Vec<String> = fields
1232        .keys()
1233        .filter(|key| key.as_str() != "body")
1234        .cloned()
1235        .collect();
1236    for key in old_keys {
1237        fields.remove(&key);
1238        modified_columns.push(key);
1239    }
1240
1241    let body_bytes = json_to_vec(&body).map_err(|err| {
1242        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1243    })?;
1244    fields.insert("body".to_string(), Value::Json(body_bytes));
1245
1246    if let JsonValue::Object(map) = &body {
1247        for (key, value) in map {
1248            fields.insert(key.clone(), json_to_storage_value(value)?);
1249            modified_columns.push(key.clone());
1250        }
1251    }
1252
1253    Ok(())
1254}
1255
1256impl RedDBRuntime {
1257    pub(crate) fn apply_loaded_patch_entity_core(
1258        &self,
1259        collection: String,
1260        mut entity: crate::storage::UnifiedEntity,
1261        payload: JsonValue,
1262        operations: Vec<PatchEntityOperation>,
1263    ) -> RedDBResult<AppliedEntityMutation> {
1264        let id = entity.id;
1265        let operations = normalize_ttl_patch_operations(operations)?;
1266        // Snapshot pre-patch row fields for the secondary-index hook —
1267        // empty for non-row entities, which is the desired no-op.
1268        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1269
1270        let db = self.db();
1271        let store = db.store();
1272        let Some(manager) = store.get_collection(&collection) else {
1273            return Err(crate::RedDBError::NotFound(format!(
1274                "collection not found: {collection}"
1275            )));
1276        };
1277
1278        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1279        let mut metadata_changed = false;
1280        let mut modified_columns: Vec<String> = Vec::new();
1281        let mut context_index_dirty = false;
1282        let mut graph_node_type: Option<String> = None;
1283        let mut graph_edge_weight: Option<f32> = None;
1284
1285        let row_contract_timestamps = db
1286            .collection_contract(&collection)
1287            .map(|c| c.timestamps_enabled)
1288            .unwrap_or(false);
1289
1290        match &mut entity.data {
1291            crate::storage::EntityData::Row(row) => {
1292                let is_document_collection = db
1293                    .collection_contract(&collection)
1294                    .map(|contract| {
1295                        contract.declared_model == crate::catalog::CollectionModel::Document
1296                    })
1297                    .unwrap_or(false);
1298                let mut field_ops = Vec::new();
1299                let mut metadata_ops = Vec::new();
1300                let mut document_body_ops = Vec::new();
1301
1302                for mut op in operations {
1303                    let Some(root) = op.path.first().map(String::as_str) else {
1304                        return Err(crate::RedDBError::Query(
1305                            "patch path cannot be empty".to_string(),
1306                        ));
1307                    };
1308
1309                    match root {
1310                        "body" if is_document_collection => {
1311                            if op.path.len() < 2 {
1312                                return Err(crate::RedDBError::Query(
1313                                    "document body patch paths require a nested key; use payload.body for full replacement"
1314                                        .to_string(),
1315                                ));
1316                            }
1317                            op.path.remove(0);
1318                            reject_document_array_position_path(&op.path)?;
1319                            document_body_ops.push(op);
1320                        }
1321                        "fields" | "named" => {
1322                            if op.path.len() < 2 {
1323                                return Err(crate::RedDBError::Query(
1324                                    "patch path 'fields' requires a nested key".to_string(),
1325                                ));
1326                            }
1327                            if row_contract_timestamps {
1328                                let leaf = op.path.get(1).map(String::as_str);
1329                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1330                                    return Err(crate::RedDBError::Query(format!(
1331                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1332                                        collection,
1333                                        leaf.unwrap_or("")
1334                                    )));
1335                                }
1336                            }
1337                            op.path.remove(0);
1338                            field_ops.push(op);
1339                        }
1340                        "metadata" => {
1341                            if op.path.len() < 2 {
1342                                return Err(crate::RedDBError::Query(
1343                                    "patch path 'metadata' requires a nested key".to_string(),
1344                                ));
1345                            }
1346                            op.path.remove(0);
1347                            metadata_ops.push(op);
1348                        }
1349                        _ => {
1350                            return Err(crate::RedDBError::Query(format!(
1351                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1352                            )));
1353                        }
1354                    }
1355                }
1356
1357                if !document_body_ops.is_empty() {
1358                    context_index_dirty = true;
1359                    let named = row.named.get_or_insert_with(Default::default);
1360                    let mut body = document_body_from_named(named)?;
1361                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1362                        .map_err(crate::RedDBError::Query)?;
1363                    replace_document_row_body(named, body, &mut modified_columns)?;
1364                }
1365
1366                if is_document_collection {
1367                    if let Some(body) = payload.get("body") {
1368                        context_index_dirty = true;
1369                        let named = row.named.get_or_insert_with(Default::default);
1370                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1371                    }
1372                }
1373
1374                if !field_ops.is_empty() {
1375                    context_index_dirty = true;
1376                    for op in &field_ops {
1377                        if let Some(col) = op.path.first() {
1378                            modified_columns.push(col.clone());
1379                        }
1380                    }
1381                    let named = row.named.get_or_insert_with(Default::default);
1382                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1383                }
1384
1385                if let Some(fields) = payload
1386                    .get("fields")
1387                    .and_then(crate::json::Value::as_object)
1388                {
1389                    if row_contract_timestamps {
1390                        for key in fields.keys() {
1391                            if key == "created_at" || key == "updated_at" {
1392                                return Err(crate::RedDBError::Query(format!(
1393                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1394                                    collection, key
1395                                )));
1396                            }
1397                        }
1398                    }
1399                    context_index_dirty = true;
1400                    let named = row.named.get_or_insert_with(Default::default);
1401                    for (key, value) in fields {
1402                        modified_columns.push(key.clone());
1403                        named.insert(key.clone(), json_to_storage_value(value)?);
1404                    }
1405                }
1406
1407                if !metadata_ops.is_empty() {
1408                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1409                    let metadata = patch_metadata.get_or_insert_with(|| {
1410                        store.get_metadata(&collection, id).unwrap_or_default()
1411                    });
1412                    let mut metadata_json = metadata_to_json(metadata);
1413                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1414                        .map_err(crate::RedDBError::Query)?;
1415                    *metadata = metadata_from_json(&metadata_json)?;
1416                    metadata_changed = true;
1417                }
1418
1419                if !modified_columns.is_empty() || row_contract_timestamps {
1420                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1421                    let current_fields = if let Some(named) = row.named.take() {
1422                        named.into_iter().collect::<Vec<_>>()
1423                    } else if let Some(schema) = row.schema.as_ref() {
1424                        schema
1425                            .iter()
1426                            .cloned()
1427                            .zip(row.columns.iter().cloned())
1428                            .collect::<Vec<_>>()
1429                    } else {
1430                        Vec::new()
1431                    };
1432                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1433                    if row_contract_timestamps {
1434                        modified_columns.push("updated_at".to_string());
1435                        context_index_dirty = true;
1436                    }
1437                    if contract.requires_uniqueness_check(&modified_columns) {
1438                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1439                    }
1440                    row.named = Some(normalized_fields.into_iter().collect());
1441                }
1442            }
1443            crate::storage::EntityData::Node(node) => {
1444                let mut field_ops = Vec::new();
1445                let mut metadata_ops = Vec::new();
1446                let mut node_type_ops = Vec::new();
1447
1448                for mut op in operations {
1449                    let Some(root) = op.path.first().map(String::as_str) else {
1450                        return Err(crate::RedDBError::Query(
1451                            "patch path cannot be empty".to_string(),
1452                        ));
1453                    };
1454
1455                    match root {
1456                        "fields" | "properties" => {
1457                            if op.path.len() < 2 {
1458                                return Err(crate::RedDBError::Query(
1459                                    "patch path 'fields' requires a nested key".to_string(),
1460                                ));
1461                            }
1462                            op.path.remove(0);
1463                            field_ops.push(op);
1464                        }
1465                        "metadata" => {
1466                            if op.path.len() < 2 {
1467                                return Err(crate::RedDBError::Query(
1468                                    "patch path 'metadata' requires a nested key".to_string(),
1469                                ));
1470                            }
1471                            op.path.remove(0);
1472                            metadata_ops.push(op);
1473                        }
1474                        "node_type" => {
1475                            if op.path.len() != 1 {
1476                                return Err(crate::RedDBError::Query(
1477                                    "patch path 'node_type' does not allow nested keys".to_string(),
1478                                ));
1479                            }
1480                            op.path.clear();
1481                            node_type_ops.push(op);
1482                        }
1483                        _ => {
1484                            return Err(crate::RedDBError::Query(format!(
1485                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1486                            )));
1487                        }
1488                    }
1489                }
1490
1491                for op in node_type_ops {
1492                    context_index_dirty = true;
1493                    let value = op.value.ok_or_else(|| {
1494                        crate::RedDBError::Query("node_type operations require a value".to_string())
1495                    })?;
1496
1497                    match op.op {
1498                        PatchEntityOperationType::Unset => {
1499                            return Err(crate::RedDBError::Query(
1500                                "node_type cannot be unset through patch operations".to_string(),
1501                            ));
1502                        }
1503                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1504                            let Some(node_type) = value.as_str() else {
1505                                return Err(crate::RedDBError::Query(
1506                                    "node_type operation requires a text value".to_string(),
1507                                ));
1508                            };
1509                            graph_node_type = Some(node_type.to_string());
1510                            modified_columns.push("node_type".to_string());
1511                        }
1512                    }
1513                }
1514
1515                if !field_ops.is_empty() {
1516                    context_index_dirty = true;
1517                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1518                }
1519
1520                if let Some(fields) = payload
1521                    .get("fields")
1522                    .and_then(crate::json::Value::as_object)
1523                {
1524                    context_index_dirty = true;
1525                    for (key, value) in fields {
1526                        node.properties
1527                            .insert(key.clone(), json_to_storage_value(value)?);
1528                    }
1529                }
1530
1531                if !metadata_ops.is_empty() {
1532                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1533                    let metadata = patch_metadata.get_or_insert_with(|| {
1534                        store.get_metadata(&collection, id).unwrap_or_default()
1535                    });
1536                    let mut metadata_json = metadata_to_json(metadata);
1537                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1538                        .map_err(crate::RedDBError::Query)?;
1539                    *metadata = metadata_from_json(&metadata_json)?;
1540                    metadata_changed = true;
1541                }
1542            }
1543            crate::storage::EntityData::Edge(edge) => {
1544                let mut field_ops = Vec::new();
1545                let mut metadata_ops = Vec::new();
1546                let mut weight_ops = Vec::new();
1547
1548                for mut op in operations {
1549                    let Some(root) = op.path.first().map(String::as_str) else {
1550                        return Err(crate::RedDBError::Query(
1551                            "patch path cannot be empty".to_string(),
1552                        ));
1553                    };
1554
1555                    match root {
1556                        "fields" | "properties" => {
1557                            if op.path.len() < 2 {
1558                                return Err(crate::RedDBError::Query(
1559                                    "patch path 'fields' requires a nested key".to_string(),
1560                                ));
1561                            }
1562                            op.path.remove(0);
1563                            field_ops.push(op);
1564                        }
1565                        "weight" => {
1566                            if op.path.len() != 1 {
1567                                return Err(crate::RedDBError::Query(
1568                                    "patch path 'weight' does not allow nested keys".to_string(),
1569                                ));
1570                            }
1571                            op.path.clear();
1572                            weight_ops.push(op);
1573                        }
1574                        "metadata" => {
1575                            if op.path.len() < 2 {
1576                                return Err(crate::RedDBError::Query(
1577                                    "patch path 'metadata' requires a nested key".to_string(),
1578                                ));
1579                            }
1580                            op.path.remove(0);
1581                            metadata_ops.push(op);
1582                        }
1583                        _ => {
1584                            return Err(crate::RedDBError::Query(format!(
1585                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1586                            )));
1587                        }
1588                    }
1589                }
1590
1591                if !field_ops.is_empty() {
1592                    context_index_dirty = true;
1593                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1594                }
1595
1596                for op in weight_ops {
1597                    context_index_dirty = true;
1598                    let value = op.value.ok_or_else(|| {
1599                        crate::RedDBError::Query("weight operations require a value".to_string())
1600                    })?;
1601
1602                    match op.op {
1603                        PatchEntityOperationType::Unset => {
1604                            return Err(crate::RedDBError::Query(
1605                                "weight cannot be unset through patch operations".to_string(),
1606                            ));
1607                        }
1608                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1609                            let Some(weight) = value.as_f64() else {
1610                                return Err(crate::RedDBError::Query(
1611                                    "weight operation requires a numeric value".to_string(),
1612                                ));
1613                            };
1614                            graph_edge_weight = Some(weight as f32);
1615                            edge.weight = weight as f32;
1616                            modified_columns.push("weight".to_string());
1617                        }
1618                    }
1619                }
1620
1621                if let Some(fields) = payload
1622                    .get("fields")
1623                    .and_then(crate::json::Value::as_object)
1624                {
1625                    context_index_dirty = true;
1626                    for (key, value) in fields {
1627                        edge.properties
1628                            .insert(key.clone(), json_to_storage_value(value)?);
1629                    }
1630                }
1631
1632                if !metadata_ops.is_empty() {
1633                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1634                    let metadata = patch_metadata.get_or_insert_with(|| {
1635                        store.get_metadata(&collection, id).unwrap_or_default()
1636                    });
1637                    let mut metadata_json = metadata_to_json(metadata);
1638                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1639                        .map_err(crate::RedDBError::Query)?;
1640                    *metadata = metadata_from_json(&metadata_json)?;
1641                    metadata_changed = true;
1642                }
1643            }
1644            crate::storage::EntityData::Vector(vector) => {
1645                let mut field_ops = Vec::new();
1646                let mut metadata_ops = Vec::new();
1647
1648                for mut op in operations {
1649                    let Some(root) = op.path.first().map(String::as_str) else {
1650                        return Err(crate::RedDBError::Query(
1651                            "patch path cannot be empty".to_string(),
1652                        ));
1653                    };
1654
1655                    match root {
1656                        "fields" => {
1657                            if op.path.len() < 2 {
1658                                return Err(crate::RedDBError::Query(
1659                                    "patch path 'fields' requires a nested key".to_string(),
1660                                ));
1661                            }
1662                            op.path.remove(0);
1663                            let Some(target) = op.path.first().map(String::as_str) else {
1664                                return Err(crate::RedDBError::Query(
1665                                    "patch path requires a target under fields".to_string(),
1666                                ));
1667                            };
1668                            if !matches!(target, "dense" | "content" | "sparse") {
1669                                return Err(crate::RedDBError::Query(format!(
1670                                    "unsupported vector patch target '{target}'"
1671                                )));
1672                            }
1673                            field_ops.push(op);
1674                        }
1675                        "metadata" => {
1676                            if op.path.len() < 2 {
1677                                return Err(crate::RedDBError::Query(
1678                                    "patch path 'metadata' requires a nested key".to_string(),
1679                                ));
1680                            }
1681                            op.path.remove(0);
1682                            metadata_ops.push(op);
1683                        }
1684                        _ => {
1685                            return Err(crate::RedDBError::Query(format!(
1686                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1687                            )));
1688                        }
1689                    }
1690                }
1691
1692                if !field_ops.is_empty() {
1693                    context_index_dirty = true;
1694                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1695                }
1696
1697                if let Some(fields) = payload
1698                    .get("fields")
1699                    .and_then(crate::json::Value::as_object)
1700                {
1701                    context_index_dirty = true;
1702                    if let Some(content) =
1703                        fields.get("content").and_then(crate::json::Value::as_str)
1704                    {
1705                        vector.content = Some(content.to_string());
1706                    }
1707                    if let Some(dense) = fields.get("dense") {
1708                        vector.dense = dense
1709                            .as_array()
1710                            .ok_or_else(|| {
1711                                crate::RedDBError::Query(
1712                                    "field 'dense' must be an array".to_string(),
1713                                )
1714                            })?
1715                            .iter()
1716                            .map(|value| {
1717                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1718                                    crate::RedDBError::Query(
1719                                        "field 'dense' must contain only numbers".to_string(),
1720                                    )
1721                                })
1722                            })
1723                            .collect::<Result<Vec<_>, _>>()?;
1724                    }
1725                }
1726
1727                if !metadata_ops.is_empty() {
1728                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1729                    let metadata = patch_metadata.get_or_insert_with(|| {
1730                        store.get_metadata(&collection, id).unwrap_or_default()
1731                    });
1732                    let mut metadata_json = metadata_to_json(metadata);
1733                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1734                        .map_err(crate::RedDBError::Query)?;
1735                    *metadata = metadata_from_json(&metadata_json)?;
1736                    metadata_changed = true;
1737                }
1738            }
1739            crate::storage::EntityData::TimeSeries(_)
1740            | crate::storage::EntityData::QueueMessage(_) => {
1741                return Err(crate::RedDBError::Query(
1742                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1743                        .to_string(),
1744                ));
1745            }
1746        }
1747
1748        if let Some(node_type) = graph_node_type {
1749            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1750                node.node_type = node_type;
1751            }
1752        }
1753        if let Some(weight) = graph_edge_weight {
1754            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1755                edge.weight = (weight * 1000.0) as u32;
1756            }
1757        }
1758
1759        if let Some(metadata) = payload
1760            .get("metadata")
1761            .and_then(crate::json::Value::as_object)
1762        {
1763            let patch_metadata = patch_metadata
1764                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1765            for (key, value) in metadata {
1766                ensure_non_tree_reserved_metadata_key(key)?;
1767                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1768            }
1769            metadata_changed = true;
1770        }
1771
1772        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1773            let patch_metadata = patch_metadata
1774                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1775            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1776                patch_metadata.remove(&key);
1777            } else {
1778                patch_metadata.set(key, value);
1779            }
1780            metadata_changed = true;
1781        }
1782
1783        entity.updated_at = std::time::SystemTime::now()
1784            .duration_since(std::time::UNIX_EPOCH)
1785            .unwrap_or_default()
1786            .as_secs();
1787
1788        modified_columns = dedupe_modified_columns(modified_columns);
1789
1790        Ok(AppliedEntityMutation {
1791            id,
1792            collection,
1793            entity,
1794            metadata: patch_metadata,
1795            modified_columns,
1796            persist_metadata: metadata_changed,
1797            context_index_dirty,
1798            replaced_entity: None,
1799            replaced_entity_previous_xmax: 0,
1800            pre_mutation_fields,
1801        })
1802    }
1803
1804    pub(crate) fn apply_loaded_sql_update_row_core(
1805        &self,
1806        collection: String,
1807        mut entity: crate::storage::UnifiedEntity,
1808        static_field_assignments: &[(String, Value)],
1809        dynamic_field_assignments: Vec<(String, Value)>,
1810        static_metadata_assignments: &[(String, MetadataValue)],
1811        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1812        row_contract_plan: Option<&RowUpdateContractPlan>,
1813        row_modified_columns_template: &[String],
1814        row_touches_unique_columns: bool,
1815    ) -> RedDBResult<AppliedEntityMutation> {
1816        let id = entity.id;
1817        let previous_xmax = entity.xmax;
1818        let db = self.db();
1819        let store = db.store();
1820        let Some(_) = store.get_collection(&collection) else {
1821            return Err(crate::RedDBError::NotFound(format!(
1822                "collection not found: {collection}"
1823            )));
1824        };
1825
1826        let versioned_update_xid = match self.current_xid() {
1827            Some(xid) => Some(xid),
1828            None => {
1829                let snapshot_manager = self.snapshot_manager();
1830                let xid = snapshot_manager.begin();
1831                snapshot_manager.commit(xid);
1832                Some(xid)
1833            }
1834        };
1835        let mut replaced_entity = versioned_update_xid.map(|xid| {
1836            let mut old = entity.clone();
1837            old.set_xmax(xid);
1838            old
1839        });
1840
1841        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1842        let row_contract_timestamps = row_contract_plan
1843            .map(|plan| plan.timestamps_enabled)
1844            .unwrap_or(false);
1845        let mut metadata_changed = false;
1846        let mut modified_columns = row_modified_columns_template.to_vec();
1847        let mut context_index_dirty = !modified_columns.is_empty();
1848
1849        // Snapshot OLD field values BEFORE applying the assignments —
1850        // the secondary-index maintenance hook needs both before/after to
1851        // delete-then-insert under changed indexed columns.
1852        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1853
1854        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1855            return Err(crate::RedDBError::Query(
1856                "SQL row update fast path requires a row entity".to_string(),
1857            ));
1858        };
1859
1860        let _ = row_contract_plan;
1861        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1862        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1863
1864        for (key, value) in static_metadata_assignments
1865            .iter()
1866            .cloned()
1867            .chain(dynamic_metadata_assignments)
1868        {
1869            ensure_non_tree_reserved_metadata_key(&key)?;
1870            patch_metadata
1871                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1872                .set(key, value);
1873            metadata_changed = true;
1874        }
1875
1876        if !modified_columns.is_empty() || row_contract_timestamps {
1877            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1878            if row_contract_timestamps {
1879                context_index_dirty = true;
1880                set_row_field(
1881                    row,
1882                    "updated_at",
1883                    Value::UnsignedInteger(current_unix_ms_u64()),
1884                );
1885                modified_columns.push("updated_at".to_string());
1886            }
1887            if row_touches_unique_columns {
1888                let current_fields = collect_row_fields(row);
1889                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1890            }
1891        }
1892
1893        entity.updated_at = std::time::SystemTime::now()
1894            .duration_since(std::time::UNIX_EPOCH)
1895            .unwrap_or_default()
1896            .as_secs();
1897
1898        if let Some(xid) = versioned_update_xid {
1899            let logical_id = entity.logical_id();
1900            entity.id = store.next_entity_id();
1901            entity.set_logical_id(logical_id);
1902            entity.set_xmin(xid);
1903            entity.set_xmax(0);
1904            if let Some(old) = replaced_entity.as_mut() {
1905                old.set_xmax(xid);
1906            }
1907        }
1908
1909        modified_columns = dedupe_modified_columns(modified_columns);
1910
1911        Ok(AppliedEntityMutation {
1912            id: entity.id,
1913            collection,
1914            entity,
1915            metadata: patch_metadata,
1916            modified_columns,
1917            persist_metadata: metadata_changed,
1918            context_index_dirty,
1919            replaced_entity,
1920            replaced_entity_previous_xmax: previous_xmax,
1921            pre_mutation_fields,
1922        })
1923    }
1924
1925    pub(crate) fn persist_applied_entity_mutations(
1926        &self,
1927        applied: &[AppliedEntityMutation],
1928    ) -> RedDBResult<()> {
1929        if applied.is_empty() {
1930            return Ok(());
1931        }
1932
1933        let store = self.db().store();
1934        let collection = &applied[0].collection;
1935        let Some(manager) = store.get_collection(collection) else {
1936            return Err(crate::RedDBError::NotFound(format!(
1937                "collection not found: {collection}"
1938            )));
1939        };
1940
1941        let mut ordinary = Vec::with_capacity(applied.len());
1942        for item in applied {
1943            if let Some(old_version) = item.replaced_entity.as_ref() {
1944                store
1945                    .install_versioned_table_row_update(
1946                        collection,
1947                        old_version.clone(),
1948                        item.entity.clone(),
1949                        if item.persist_metadata {
1950                            item.metadata.as_ref()
1951                        } else {
1952                            None
1953                        },
1954                    )
1955                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1956                if self.current_xid().is_some() {
1957                    self.record_pending_versioned_update(
1958                        crate::runtime::impl_core::current_connection_id(),
1959                        collection,
1960                        old_version.id,
1961                        item.entity.id,
1962                        old_version.xmax,
1963                        item.replaced_entity_previous_xmax,
1964                    );
1965                }
1966            } else {
1967                ordinary.push(item);
1968            }
1969        }
1970        if ordinary.is_empty() {
1971            return Ok(());
1972        }
1973
1974        manager
1975            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1976                (
1977                    &item.entity,
1978                    item.modified_columns.as_slice(),
1979                    if item.persist_metadata {
1980                        item.metadata.as_ref()
1981                    } else {
1982                        None
1983                    },
1984                )
1985            }))
1986            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1987
1988        // PG-HOT-like fast path: segment in-place is done; when no
1989        // mutation touches a secondary-indexed column AND no metadata
1990        // payload needs to be folded into a B-tree record, skip the
1991        // in-line B-tree upsert. The WAL still records the update
1992        // (durability preserved; recovery replay rebuilds the B-tree),
1993        // and `manager.get()` prefers the live segment over the
1994        // B-tree for reads — so the short-circuit is invisible to
1995        // callers. See `persist_entities_to_pager_wal_only`.
1996        let indexed_cols = self
1997            .index_store_ref()
1998            .indexed_columns_set(collection.as_str());
1999        let all_hot = !indexed_cols.is_empty()
2000            && ordinary.iter().all(|item| {
2001                !item.persist_metadata
2002                    && !item
2003                        .modified_columns
2004                        .iter()
2005                        .any(|c| indexed_cols.contains(c))
2006            })
2007            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2008
2009        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2010        // inner loop hands us `applied` which already owns the post-image
2011        // entity; all we need for the persist path is a read borrow.
2012        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2013            ordinary.iter().map(|item| &item.entity).collect();
2014        let persist_fn = if all_hot {
2015            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2016        } else {
2017            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2018        };
2019        persist_fn(store.as_ref(), collection, &entity_refs)
2020            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2021    }
2022
2023    pub(crate) fn flush_applied_entity_mutation(
2024        &self,
2025        applied: &AppliedEntityMutation,
2026    ) -> RedDBResult<()> {
2027        let store = self.db().store();
2028        if applied.context_index_dirty {
2029            store
2030                .context_index()
2031                .index_entity(&applied.collection, &applied.entity);
2032        }
2033        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2034        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2035        // didn't carry recoverable column names) — there's nothing to
2036        // delete-then-insert in that case.
2037        //
2038        // Also build the CDC damage vector here so downstream consumers
2039        // see which columns changed without re-diffing.
2040        let mut changed_columns: Option<Vec<String>> = None;
2041        if !applied.pre_mutation_fields.is_empty() {
2042            let post = entity_row_fields_snapshot(&applied.entity);
2043            if !post.is_empty() {
2044                let damage = crate::application::entity::row_damage_vector(
2045                    &applied.pre_mutation_fields,
2046                    &post,
2047                );
2048                if !damage.is_empty() {
2049                    changed_columns = Some(
2050                        damage
2051                            .touched_columns()
2052                            .into_iter()
2053                            .map(str::to_string)
2054                            .collect(),
2055                    );
2056                }
2057
2058                // HOT-like fast path (P3.T2/T3): when no modified
2059                // column is covered by a secondary index, skip the
2060                // `index_entity_update` call entirely. The function
2061                // would short-circuit internally, but the call still
2062                // reads the registry lock + walks the damage vector
2063                // — avoiding it saves a few microseconds per UPDATE.
2064                // Page-local replace + t_ctid chain support (true
2065                // HOT) lives in a follow-up storage spec.
2066                let indexed_cols: std::collections::HashSet<String> = self
2067                    .index_store_ref()
2068                    .list_indices(applied.collection.as_str())
2069                    .into_iter()
2070                    .filter_map(|idx| idx.columns.first().cloned())
2071                    .collect();
2072                let modified_cols: std::collections::HashSet<String> = damage
2073                    .touched_columns()
2074                    .into_iter()
2075                    .map(str::to_string)
2076                    .collect();
2077                if let Some(old_version) = applied.replaced_entity.as_ref() {
2078                    let old_index_fields: Vec<(String, Value)> = applied
2079                        .pre_mutation_fields
2080                        .iter()
2081                        .filter(|(col, _)| indexed_cols.contains(col))
2082                        .cloned()
2083                        .collect();
2084                    let new_index_fields: Vec<(String, Value)> = post
2085                        .iter()
2086                        .filter(|(col, _)| indexed_cols.contains(col))
2087                        .cloned()
2088                        .collect();
2089                    if !old_index_fields.is_empty() {
2090                        self.index_store_ref()
2091                            .index_entity_delete(
2092                                &applied.collection,
2093                                old_version.id,
2094                                &old_index_fields,
2095                            )
2096                            .map_err(crate::RedDBError::Internal)?;
2097                    }
2098                    if !new_index_fields.is_empty() {
2099                        self.index_store_ref()
2100                            .index_entity_insert(
2101                                &applied.collection,
2102                                applied.entity.id,
2103                                &new_index_fields,
2104                            )
2105                            .map_err(crate::RedDBError::Internal)?;
2106                    }
2107                } else {
2108                    let decision = crate::storage::engine::hot_update::decide(
2109                        &crate::storage::engine::hot_update::HotUpdateInputs {
2110                            collection: applied.collection.as_str(),
2111                            indexed_columns: &indexed_cols,
2112                            modified_columns: &modified_cols,
2113                            // The storage layer currently handles fit via
2114                            // the segment abstraction; we bypass the
2115                            // page-size check here.
2116                            new_tuple_size: 0,
2117                            page_free_space: usize::MAX,
2118                        },
2119                    );
2120                    if !decision.can_hot {
2121                        self.index_store_ref()
2122                            .index_entity_update(
2123                                &applied.collection,
2124                                applied.id,
2125                                &applied.pre_mutation_fields,
2126                                &post,
2127                            )
2128                            .map_err(crate::RedDBError::Internal)?;
2129                    } else {
2130                        // F-04: `applied.collection` is tenant-supplied;
2131                        // strip CR/LF/control bytes via the LogField
2132                        // escaper (ADR 0010).
2133                        tracing::debug!(
2134                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2135                            "hot_update fast-path: skipped index_entity_update"
2136                        );
2137                    }
2138                }
2139            }
2140        }
2141        self.cdc_emit_prebuilt_with_columns(
2142            crate::replication::cdc::ChangeOperation::Update,
2143            &applied.collection,
2144            &applied.entity,
2145            "entity",
2146            applied.metadata.as_ref(),
2147            true,
2148            changed_columns,
2149        );
2150        Ok(())
2151    }
2152
2153    pub(crate) fn apply_loaded_patch_entity(
2154        &self,
2155        collection: String,
2156        entity: crate::storage::UnifiedEntity,
2157        payload: JsonValue,
2158        operations: Vec<PatchEntityOperation>,
2159    ) -> RedDBResult<CreateEntityOutput> {
2160        let applied =
2161            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2162        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2163        self.flush_applied_entity_mutation(&applied)?;
2164        Ok(CreateEntityOutput {
2165            id: applied.id,
2166            entity: Some(applied.entity),
2167        })
2168    }
2169}
2170
2171fn ensure_non_tree_reserved_metadata_patch_paths(
2172    operations: &[PatchEntityOperation],
2173) -> RedDBResult<()> {
2174    for operation in operations {
2175        let Some(key) = operation.path.first().map(String::as_str) else {
2176            continue;
2177        };
2178        ensure_non_tree_reserved_metadata_key(key)?;
2179    }
2180    Ok(())
2181}
2182
2183fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2184    if key.starts_with(TREE_METADATA_PREFIX) {
2185        return Err(crate::RedDBError::Query(format!(
2186            "metadata key '{}' is reserved for managed trees",
2187            key
2188        )));
2189    }
2190    Ok(())
2191}
2192
2193fn ensure_non_tree_reserved_metadata_entries(
2194    metadata: &[(String, MetadataValue)],
2195) -> RedDBResult<()> {
2196    for (key, _) in metadata {
2197        ensure_non_tree_reserved_metadata_key(key)?;
2198    }
2199    Ok(())
2200}
2201
2202fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2203    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2204        return Err(crate::RedDBError::Query(format!(
2205            "edge label '{}' is reserved for managed trees",
2206            TREE_CHILD_EDGE_LABEL
2207        )));
2208    }
2209    Ok(())
2210}
2211
2212impl RedDBRuntime {
2213    pub(crate) fn create_node_unchecked(
2214        &self,
2215        input: CreateNodeInput,
2216    ) -> RedDBResult<CreateEntityOutput> {
2217        let db = self.db();
2218        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2219        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2220        let mut metadata = input.metadata;
2221        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2222        let mut builder = db.node(&input.collection, &input.label);
2223
2224        if let Some(node_type) = input.node_type {
2225            builder = builder.node_type(node_type);
2226        }
2227
2228        for (key, value) in input.properties {
2229            builder = builder.property(key, value);
2230        }
2231
2232        for (key, value) in metadata {
2233            builder = builder.metadata(key, value);
2234        }
2235
2236        for embedding in input.embeddings {
2237            if let Some(model) = embedding.model {
2238                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2239            } else {
2240                builder = builder.embedding(embedding.name, embedding.vector);
2241            }
2242        }
2243
2244        for link in input.table_links {
2245            builder = builder.link_to_table(link.key, link.table);
2246        }
2247
2248        for link in input.node_links {
2249            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2250        }
2251
2252        let id = builder.save()?;
2253        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2254        // don't see this node until the transaction commits.
2255        self.stamp_xmin_if_in_txn(&input.collection, id);
2256        refresh_context_index(&db, &input.collection, id)?;
2257        self.cdc_emit(
2258            crate::replication::cdc::ChangeOperation::Insert,
2259            &input.collection,
2260            id.raw(),
2261            "graph_node",
2262        );
2263        Ok(CreateEntityOutput {
2264            id,
2265            entity: db.store().get(&input.collection, id),
2266        })
2267    }
2268
2269    pub(crate) fn create_edge_unchecked(
2270        &self,
2271        input: CreateEdgeInput,
2272    ) -> RedDBResult<CreateEntityOutput> {
2273        let db = self.db();
2274        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2275        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2276        let mut metadata = input.metadata;
2277        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2278        let mut builder = db
2279            .edge(&input.collection, &input.label)
2280            .from(input.from)
2281            .to(input.to);
2282
2283        if let Some(weight) = input.weight {
2284            builder = builder.weight(weight);
2285        }
2286
2287        for (key, value) in input.properties {
2288            builder = builder.property(key, value);
2289        }
2290
2291        for (key, value) in metadata {
2292            builder = builder.metadata(key, value);
2293        }
2294
2295        let id = builder.save()?;
2296        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2297        // sessions don't follow it until COMMIT.
2298        self.stamp_xmin_if_in_txn(&input.collection, id);
2299        refresh_context_index(&db, &input.collection, id)?;
2300        self.cdc_emit(
2301            crate::replication::cdc::ChangeOperation::Insert,
2302            &input.collection,
2303            id.raw(),
2304            "graph_edge",
2305        );
2306        Ok(CreateEntityOutput {
2307            id,
2308            entity: db.store().get(&input.collection, id),
2309        })
2310    }
2311}
2312
2313fn create_rows_batch_prevalidated_columnar_with_outputs(
2314    runtime: &RedDBRuntime,
2315    collection: String,
2316    column_names: std::sync::Arc<Vec<String>>,
2317    rows: Vec<Vec<crate::storage::schema::Value>>,
2318) -> RedDBResult<Vec<CreateEntityOutput>> {
2319    use crate::storage::{
2320        unified::{EntityData, EntityKind, RowData},
2321        EntityId, UnifiedEntity,
2322    };
2323    use std::sync::Arc;
2324
2325    if rows.is_empty() {
2326        return Ok(Vec::new());
2327    }
2328    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2329    runtime.check_batch_size(rows.len())?;
2330    runtime.check_db_size()?;
2331
2332    let db = runtime.db();
2333    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2334    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2335
2336    let store = db.store();
2337    let table_arc: Arc<str> = Arc::from(collection.as_str());
2338
2339    let indexed_cols = runtime
2340        .index_store_ref()
2341        .indexed_columns_set(collection.as_str());
2342    let has_secondary_indexes = !indexed_cols.is_empty();
2343    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2344        if has_secondary_indexes {
2345            Vec::with_capacity(rows.len())
2346        } else {
2347            Vec::new()
2348        };
2349
2350    let entities: Vec<UnifiedEntity> = rows
2351        .into_iter()
2352        .map(|values| {
2353            if has_secondary_indexes {
2354                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2355                    Vec::with_capacity(indexed_cols.len());
2356                for (name, val) in column_names.iter().zip(values.iter()) {
2357                    if indexed_cols.contains(name) {
2358                        snap.push((name.clone(), val.clone()));
2359                    }
2360                }
2361                field_snapshots.push(snap);
2362            }
2363            let mut row = RowData::new(values);
2364            row.schema = Some(Arc::clone(&column_names));
2365            UnifiedEntity::new(
2366                EntityId::new(0),
2367                EntityKind::TableRow {
2368                    table: Arc::clone(&table_arc),
2369                    row_id: 0,
2370                },
2371                EntityData::Row(row),
2372            )
2373        })
2374        .collect();
2375
2376    let ids = store
2377        .bulk_insert(&collection, entities)
2378        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2379
2380    if has_secondary_indexes {
2381        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2382            .iter()
2383            .zip(field_snapshots)
2384            .map(|(id, fields)| (*id, fields))
2385            .collect();
2386        runtime
2387            .index_store_ref()
2388            .index_entity_insert_batch(&collection, &index_rows)
2389            .map_err(crate::RedDBError::Internal)?;
2390    }
2391
2392    runtime.invalidate_result_cache();
2393    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2394
2395    Ok(ids
2396        .into_iter()
2397        .map(|id| CreateEntityOutput { id, entity: None })
2398        .collect())
2399}
2400
2401impl RuntimeEntityPort for RedDBRuntime {
2402    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2403        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2404        let db = self.db();
2405        let CreateRowInput {
2406            collection,
2407            fields,
2408            metadata: input_metadata,
2409            node_links,
2410            vector_links,
2411        } = input;
2412        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2413        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2414        let mut metadata = input_metadata;
2415        apply_collection_default_ttl(&db, &collection, &mut metadata);
2416        let fields = contract.normalize_insert_fields(fields)?;
2417        contract.enforce_row_uniqueness(&fields, None)?;
2418        // Route through MutationEngine for unified hot path.
2419        let engine = self.mutation_engine();
2420        let result = engine.apply(
2421            collection.clone(),
2422            vec![crate::runtime::mutation::MutationRow {
2423                fields,
2424                metadata,
2425                node_links,
2426                vector_links,
2427            }],
2428        )?;
2429        let id = result.ids[0];
2430        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2431        // that also takes a write lock on the entity cache. We know
2432        // the collection — hit the manager directly. Cuts
2433        // create_row() p50 roughly in half on the hot path.
2434        Ok(CreateEntityOutput {
2435            id,
2436            entity: db.store().get(&collection, id),
2437        })
2438    }
2439
2440    fn create_rows_batch(
2441        &self,
2442        input: CreateRowsBatchInput,
2443    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2444        if input.rows.is_empty() {
2445            return Ok(Vec::new());
2446        }
2447        self.check_batch_size(input.rows.len())?;
2448        self.check_db_size()?;
2449
2450        let db = self.db();
2451        let collection = input.collection;
2452        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2453        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2454
2455        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2456        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2457        for row in input.rows {
2458            if row.collection != collection {
2459                return Err(crate::RedDBError::Query(format!(
2460                    "batch row collection mismatch: expected '{}', got '{}'",
2461                    collection, row.collection
2462                )));
2463            }
2464
2465            let mut metadata = row.metadata;
2466            apply_collection_default_ttl(&db, &collection, &mut metadata);
2467            let fields = contract.normalize_insert_fields(row.fields)?;
2468            contract.enforce_row_uniqueness(&fields, None)?;
2469            uniqueness_rows.push(fields.clone());
2470            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2471        }
2472
2473        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2474
2475        // Route through MutationEngine: single bulk_insert + one CDC batch
2476        // instead of N separate cdc_emit() calls (each acquires a write lock).
2477        let engine = {
2478            let e = self.mutation_engine();
2479            if input.suppress_events {
2480                e.with_suppress_events()
2481            } else {
2482                e
2483            }
2484        };
2485        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2486            .into_iter()
2487            .map(|(fields, metadata, node_links, vector_links)| {
2488                crate::runtime::mutation::MutationRow {
2489                    fields,
2490                    metadata,
2491                    node_links,
2492                    vector_links,
2493                }
2494            })
2495            .collect();
2496
2497        let result = engine
2498            .apply(collection.clone(), mutation_rows)
2499            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2500
2501        let store = db.store();
2502        Ok(result
2503            .ids
2504            .into_iter()
2505            .map(|id| CreateEntityOutput {
2506                id,
2507                entity: store.get(&collection, id),
2508            })
2509            .collect())
2510    }
2511
2512    fn create_rows_batch_prevalidated_columnar(
2513        &self,
2514        collection: String,
2515        column_names: std::sync::Arc<Vec<String>>,
2516        rows: Vec<Vec<crate::storage::schema::Value>>,
2517    ) -> RedDBResult<usize> {
2518        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2519            .map(|outputs| outputs.len())
2520    }
2521
2522    fn create_rows_batch_columnar(
2523        &self,
2524        collection: String,
2525        column_names: std::sync::Arc<Vec<String>>,
2526        rows: Vec<Vec<crate::storage::schema::Value>>,
2527    ) -> RedDBResult<usize> {
2528        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2529            .map(|outputs| outputs.len())
2530    }
2531
2532    fn create_rows_batch_columnar_with_outputs(
2533        &self,
2534        collection: String,
2535        column_names: std::sync::Arc<Vec<String>>,
2536        rows: Vec<Vec<crate::storage::schema::Value>>,
2537    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2538        if rows.is_empty() {
2539            return Ok(Vec::new());
2540        }
2541        self.check_batch_size(rows.len())?;
2542        self.check_db_size()?;
2543
2544        let db = self.db();
2545        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2546        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2547
2548        // Fast path: when the collection carries no contract (or the
2549        // contract has no declared columns) `normalize_insert_fields`
2550        // is a no-op and `enforce_row_uniqueness` finds no unique
2551        // columns to check. Skip the per-row `(String, Value)` tuple
2552        // materialisation entirely and route through the prevalidated
2553        // columnar kernel — same write, CDC, and index path, just
2554        // without the wasted (String, Value) clones. This is the
2555        // bench `bench_users` shape (no contract declared by the
2556        // adapter's `setup_schema`).
2557        let needs_normalisation = match db.collection_contract(&collection) {
2558            Some(c) => {
2559                c.declared_model == crate::catalog::CollectionModel::Table
2560                    && (!c.declared_columns.is_empty()
2561                        || c.table_def
2562                            .as_ref()
2563                            .map(|t| !t.columns.is_empty())
2564                            .unwrap_or(false))
2565            }
2566            None => false,
2567        };
2568        if !needs_normalisation {
2569            return create_rows_batch_prevalidated_columnar_with_outputs(
2570                self,
2571                collection,
2572                column_names,
2573                rows,
2574            );
2575        }
2576
2577        // Slow path: contract requires per-row normalisation /
2578        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2579        // the columnar layout (this still pays N×ncols `String::clone`
2580        // — but it's deferred out of the wire decoder hot loop and
2581        // happens behind a runtime API, not the per-row decode loop)
2582        // and fall through to the existing `create_rows_batch` path.
2583        let ncols = column_names.len();
2584        let tuple_rows: Vec<CreateRowInput> = rows
2585            .into_iter()
2586            .map(|values| {
2587                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2588                    Vec::with_capacity(ncols);
2589                for (name, value) in column_names.iter().zip(values) {
2590                    fields.push((name.clone(), value));
2591                }
2592                CreateRowInput {
2593                    collection: collection.clone(),
2594                    fields,
2595                    metadata: Vec::new(),
2596                    node_links: Vec::new(),
2597                    vector_links: Vec::new(),
2598                }
2599            })
2600            .collect();
2601        self.create_rows_batch(CreateRowsBatchInput {
2602            collection,
2603            rows: tuple_rows,
2604            suppress_events: false,
2605        })
2606    }
2607
2608    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2609        if input.rows.is_empty() {
2610            return Ok(0);
2611        }
2612        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2613        self.check_batch_size(input.rows.len())?;
2614        self.check_db_size()?;
2615
2616        let db = self.db();
2617        let collection = input.collection;
2618        // Still verify the collection's declared model before we blast
2619        // rows at it — this one-off check is O(1), independent of
2620        // ncols, and catches schema-kind mismatches that the client
2621        // can't always see.
2622        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2623        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2624
2625        // Hoist the per-collection default TTL lookup out of the
2626        // per-row loop — it only depends on the collection, not on
2627        // individual rows, and the old path did one HashMap read per
2628        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2629        // apply to any row whose metadata doesn't already carry a TTL.
2630        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2631
2632        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2633            Vec::with_capacity(input.rows.len());
2634        let mut mutation_rows = mutation_rows;
2635        for row in input.rows {
2636            if row.collection != collection {
2637                return Err(crate::RedDBError::Query(format!(
2638                    "batch row collection mismatch: expected '{}', got '{}'",
2639                    collection, row.collection
2640                )));
2641            }
2642            let mut metadata = row.metadata;
2643            if let Some(ttl) = default_ttl_ms {
2644                if !has_internal_ttl_metadata(&metadata) {
2645                    metadata.push((
2646                        "_ttl_ms".to_string(),
2647                        if ttl <= i64::MAX as u64 {
2648                            MetadataValue::Int(ttl as i64)
2649                        } else {
2650                            MetadataValue::Timestamp(ttl)
2651                        },
2652                    ));
2653                }
2654            }
2655            mutation_rows.push(crate::runtime::mutation::MutationRow {
2656                fields: row.fields,
2657                metadata,
2658                node_links: row.node_links,
2659                vector_links: row.vector_links,
2660            });
2661        }
2662
2663        let engine = self.mutation_engine();
2664        let result = engine
2665            .apply(collection, mutation_rows)
2666            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2667        Ok(result.ids.len())
2668    }
2669
2670    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2671        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2672        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2673            input.properties.iter().map(|(key, _)| key.as_str()),
2674            &format!("node '{}'", input.collection),
2675        )?;
2676        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2677        self.create_node_unchecked(input)
2678    }
2679
2680    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2681        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2682        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2683            input.properties.iter().map(|(key, _)| key.as_str()),
2684            &format!("edge '{}'", input.collection),
2685        )?;
2686        ensure_non_tree_structural_edge_label(&input.label)?;
2687        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2688        self.create_edge_unchecked(input)
2689    }
2690
2691    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2692        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2693        let db = self.db();
2694        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2695        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2696        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2697        let mut metadata = input.metadata;
2698        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2699        let mut builder = db.vector(&input.collection).dense(input.dense);
2700
2701        if let Some(content) = input.content {
2702            builder = builder.content(content);
2703        }
2704
2705        for (key, value) in metadata {
2706            builder = builder.metadata(key, value);
2707        }
2708
2709        if let Some(link_row) = input.link_row {
2710            builder = builder.link_to_table(link_row);
2711        }
2712
2713        if let Some(link_node) = input.link_node {
2714            builder = builder.link_to_node(link_node);
2715        }
2716
2717        let id = builder.save()?;
2718        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2719        // concurrent ANN scans hide it until the transaction commits.
2720        self.stamp_xmin_if_in_txn(&input.collection, id);
2721        refresh_context_index(&db, &input.collection, id)?;
2722        self.cdc_emit(
2723            crate::replication::cdc::ChangeOperation::Insert,
2724            &input.collection,
2725            id.raw(),
2726            "vector",
2727        );
2728        Ok(CreateEntityOutput {
2729            id,
2730            entity: db.store().get(&input.collection, id),
2731        })
2732    }
2733
2734    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2735        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2736        let db = self.db();
2737        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2738        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2739
2740        if let JsonValue::Object(ref map) = input.body {
2741            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2742                map.keys().map(String::as_str),
2743                &format!("document '{}'", input.collection),
2744            )?;
2745        }
2746
2747        // Serialize the full body as Value::Json for the "body" field
2748        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2749            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2750        })?;
2751        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2752            "body".to_string(),
2753            crate::storage::schema::Value::Json(body_bytes),
2754        )];
2755
2756        // Flatten top-level keys from the body into named fields for filtering
2757        if let JsonValue::Object(ref map) = input.body {
2758            for (key, value) in map {
2759                let storage_value = json_to_storage_value(value)?;
2760                fields.push((key.clone(), storage_value));
2761            }
2762        }
2763
2764        let mut metadata = input.metadata;
2765        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2766        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2767            .iter()
2768            .map(|(key, value)| (key.as_str(), value.clone()))
2769            .collect();
2770        let mut builder = db.row(&input.collection, columns);
2771
2772        for (key, value) in metadata {
2773            builder = builder.metadata(key, value);
2774        }
2775
2776        for node in input.node_links {
2777            builder = builder.link_to_node(node);
2778        }
2779
2780        for vector in input.vector_links {
2781            builder = builder.link_to_vector(vector);
2782        }
2783
2784        let id = builder.save()?;
2785        // Phase 1.1 MVCC universal: stamp xmin on the document.
2786        self.stamp_xmin_if_in_txn(&input.collection, id);
2787        refresh_context_index(&db, &input.collection, id)?;
2788        self.cdc_emit(
2789            crate::replication::cdc::ChangeOperation::Insert,
2790            &input.collection,
2791            id.raw(),
2792            "document",
2793        );
2794        Ok(CreateEntityOutput {
2795            id,
2796            entity: db.store().get(&input.collection, id),
2797        })
2798    }
2799
2800    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2801        let db = self.db();
2802        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2803        let declared_model = db
2804            .collection_contract(&input.collection)
2805            .map(|contract| contract.declared_model);
2806        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2807            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2808            self.seal_vault_value(&input.collection, input.value)?
2809        } else {
2810            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2811            input.value
2812        };
2813        let fields = vec![
2814            (
2815                "key".to_string(),
2816                crate::storage::schema::Value::text(input.key),
2817            ),
2818            ("value".to_string(), value),
2819        ];
2820        let collection = input.collection;
2821        let result = self.mutation_engine().apply(
2822            collection.clone(),
2823            vec![crate::runtime::mutation::MutationRow {
2824                fields,
2825                metadata: input.metadata,
2826                node_links: Vec::new(),
2827                vector_links: Vec::new(),
2828            }],
2829        )?;
2830        let id = result.ids[0];
2831        Ok(CreateEntityOutput {
2832            id,
2833            entity: db.store().get(&collection, id),
2834        })
2835    }
2836
2837    fn create_timeseries_point(
2838        &self,
2839        input: CreateTimeSeriesPointInput,
2840    ) -> RedDBResult<CreateEntityOutput> {
2841        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2842        let db = self.db();
2843        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2844        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2845
2846        let mut fields = vec![
2847            (
2848                "metric".to_string(),
2849                crate::storage::schema::Value::text(input.metric),
2850            ),
2851            (
2852                "value".to_string(),
2853                crate::storage::schema::Value::Float(input.value),
2854            ),
2855        ];
2856
2857        if let Some(timestamp_ns) = input.timestamp_ns {
2858            fields.push((
2859                "timestamp".to_string(),
2860                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2861            ));
2862        }
2863
2864        if !input.tags.is_empty() {
2865            let tags_json = JsonValue::Object(
2866                input
2867                    .tags
2868                    .into_iter()
2869                    .map(|(key, value)| (key, JsonValue::String(value)))
2870                    .collect(),
2871            );
2872            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2873                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2874            })?;
2875            fields.push((
2876                "tags".to_string(),
2877                crate::storage::schema::Value::Json(tags_bytes),
2878            ));
2879        }
2880
2881        let collection = input.collection;
2882        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2883        // Phase 1.1 MVCC universal: stamp xmin on the point so
2884        // concurrent range scans hide it until COMMIT.
2885        self.stamp_xmin_if_in_txn(&collection, id);
2886        refresh_context_index(&db, &collection, id)?;
2887
2888        Ok(CreateEntityOutput {
2889            id,
2890            entity: db.store().get(&collection, id),
2891        })
2892    }
2893
2894    fn get_kv(
2895        &self,
2896        collection: &str,
2897        key: &str,
2898    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2899        let db = self.db();
2900        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2901        let store = db.store();
2902        let Some(manager) = store.get_collection(collection) else {
2903            return Ok(None);
2904        };
2905        let entities = manager.query_all(|_| true);
2906        for entity in entities {
2907            if let crate::storage::EntityData::Row(ref row) = entity.data {
2908                if let Some(ref named) = row.named {
2909                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2910                        if &**k == key {
2911                            let value = named
2912                                .get("value")
2913                                .cloned()
2914                                .unwrap_or(crate::storage::schema::Value::Null);
2915                            return Ok(Some((value, entity.id)));
2916                        }
2917                    }
2918                }
2919            }
2920        }
2921        Ok(None)
2922    }
2923
2924    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2925        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2926        let found = self.get_kv(collection, key)?;
2927        if let Some((_, id)) = found {
2928            let db = self.db();
2929            let store = db.store();
2930            let deleted = store
2931                .delete(collection, id)
2932                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2933            if deleted {
2934                store.context_index().remove_entity(id);
2935            }
2936            Ok(deleted)
2937        } else {
2938            Ok(false)
2939        }
2940    }
2941
2942    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2943        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2944        let PatchEntityInput {
2945            collection,
2946            id,
2947            payload,
2948            operations,
2949        } = input;
2950        let db = self.db();
2951        let store = db.store();
2952        let Some(manager) = store.get_collection(&collection) else {
2953            return Err(crate::RedDBError::NotFound(format!(
2954                "collection not found: {collection}"
2955            )));
2956        };
2957        let Some(entity) = manager.get(id) else {
2958            return Err(crate::RedDBError::NotFound(format!(
2959                "entity not found: {}",
2960                id.raw()
2961            )));
2962        };
2963        self.apply_loaded_patch_entity(collection, entity, payload, operations)
2964    }
2965
2966    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2967        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2968        let store = self.db().store();
2969        // Snapshot row fields before delete so we can mirror the removal
2970        // into every secondary index. The fetch is best-effort: if the
2971        // entity is already gone, the delete below is a no-op anyway.
2972        let pre_delete_fields = store
2973            .get(&input.collection, input.id)
2974            .as_ref()
2975            .map(entity_row_fields_snapshot)
2976            .unwrap_or_default();
2977        // Store delete first (source of truth). Crash between here and index removal
2978        // leaves the entity invisible to most queries but recoverable; the reverse
2979        // (remove index first, then crash) leaves the entity permanently orphaned.
2980        let deleted = store
2981            .delete(&input.collection, input.id)
2982            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2983        if deleted {
2984            store.context_index().remove_entity(input.id);
2985            // Secondary index maintenance — surface only registry-shape
2986            // errors; missing-index removals are tolerated inside the call.
2987            if !pre_delete_fields.is_empty() {
2988                self.index_store_ref()
2989                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2990                    .map_err(crate::RedDBError::Internal)?;
2991            }
2992            self.cdc_emit(
2993                crate::replication::cdc::ChangeOperation::Delete,
2994                &input.collection,
2995                input.id.raw(),
2996                "entity",
2997            );
2998        }
2999        Ok(DeleteEntityOutput {
3000            deleted,
3001            id: input.id,
3002        })
3003    }
3004}