Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        vector_dimension: None,
112        vector_metric: None,
113        context_index_fields: Vec::new(),
114        declared_columns: Vec::new(),
115        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117        timestamps_enabled: false,
118        context_index_enabled: false,
119        metrics_raw_retention_ms: None,
120        metrics_rollup_policies: Vec::new(),
121        metrics_tenant_identity: None,
122        metrics_namespace: None,
123        // Implicit contracts are created on first write — mutability
124        // is the default until the operator runs explicit DDL.
125        append_only: false,
126        subscriptions: Vec::new(),
127        analytics_config: Vec::new(),
128        session_key: None,
129        session_gap_ms: None,
130        retention_duration_ms: None,
131    })
132    .map(|_| ())
133    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
134}
135
136fn implicit_contract_unix_ms() -> u128 {
137    std::time::SystemTime::now()
138        .duration_since(std::time::UNIX_EPOCH)
139        .unwrap_or_default()
140        .as_millis()
141}
142
143fn collection_model_allows(
144    declared_model: crate::catalog::CollectionModel,
145    requested_model: crate::catalog::CollectionModel,
146) -> bool {
147    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
148}
149
150fn ensure_vector_dimension_contract(
151    db: &crate::storage::unified::devx::RedDB,
152    collection: &str,
153    actual_dimension: usize,
154) -> RedDBResult<()> {
155    let Some(expected_dimension) = db
156        .collection_contract(collection)
157        .and_then(|contract| contract.vector_dimension)
158    else {
159        return Ok(());
160    };
161    if expected_dimension == actual_dimension {
162        return Ok(());
163    }
164    Err(crate::RedDBError::Query(format!(
165        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
166    )))
167}
168
169fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
170    match model {
171        crate::catalog::CollectionModel::Table => "table",
172        crate::catalog::CollectionModel::Document => "document",
173        crate::catalog::CollectionModel::Graph => "graph",
174        crate::catalog::CollectionModel::Vector => "vector",
175        crate::catalog::CollectionModel::Hll => "hll",
176        crate::catalog::CollectionModel::Sketch => "sketch",
177        crate::catalog::CollectionModel::Filter => "filter",
178        crate::catalog::CollectionModel::Kv => "kv",
179        crate::catalog::CollectionModel::Config => "config",
180        crate::catalog::CollectionModel::Vault => "vault",
181        crate::catalog::CollectionModel::Mixed => "mixed",
182        crate::catalog::CollectionModel::TimeSeries => "timeseries",
183        crate::catalog::CollectionModel::Queue => "queue",
184        crate::catalog::CollectionModel::Metrics => "metrics",
185    }
186}
187
188#[derive(Clone)]
189struct UniquenessRule {
190    name: String,
191    columns: Vec<String>,
192    primary_key: bool,
193}
194
195#[derive(Copy, Clone, PartialEq, Eq)]
196enum NormalizeMode {
197    /// First write for this row. Timestamps auto-filled from now on
198    /// both `created_at` and `updated_at`; user attempts to set
199    /// either column are rejected.
200    Insert,
201    /// Update/patch path. `created_at` is preserved from the existing
202    /// row (immutable after insert); `updated_at` is bumped to now.
203    /// User attempts to set either via the patch are rejected.
204    Update,
205}
206
207mod collection_contract_enforcement {
208    use super::*;
209
210    pub(super) struct CollectionContractWriteEnforcer<'a> {
211        db: &'a crate::storage::unified::devx::RedDB,
212        collection: &'a str,
213    }
214
215    impl<'a> CollectionContractWriteEnforcer<'a> {
216        pub(super) fn new(
217            db: &'a crate::storage::unified::devx::RedDB,
218            collection: &'a str,
219        ) -> Self {
220            Self { db, collection }
221        }
222
223        pub(super) fn ensure_model(
224            &self,
225            requested_model: crate::catalog::CollectionModel,
226        ) -> RedDBResult<()> {
227            ensure_collection_model_contract(self.db, self.collection, requested_model)
228        }
229
230        pub(super) fn normalize_insert_fields(
231            &self,
232            fields: Vec<(String, Value)>,
233        ) -> RedDBResult<Vec<(String, Value)>> {
234            normalize_row_fields_for_contract_with_mode(
235                self.db,
236                self.collection,
237                fields,
238                NormalizeMode::Insert,
239            )
240        }
241
242        pub(super) fn normalize_update_fields(
243            &self,
244            fields: Vec<(String, Value)>,
245        ) -> RedDBResult<Vec<(String, Value)>> {
246            normalize_row_fields_for_contract_with_mode(
247                self.db,
248                self.collection,
249                fields,
250                NormalizeMode::Update,
251            )
252        }
253
254        pub(super) fn enforce_row_uniqueness(
255            &self,
256            fields: &[(String, Value)],
257            exclude_id: Option<crate::storage::EntityId>,
258        ) -> RedDBResult<()> {
259            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
260        }
261
262        pub(super) fn enforce_batch_uniqueness(
263            &self,
264            rows: &[Vec<(String, Value)>],
265        ) -> RedDBResult<()> {
266            enforce_row_batch_uniqueness(self.db, self.collection, rows)
267        }
268
269        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
270            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
271        }
272    }
273}
274
275use collection_contract_enforcement::CollectionContractWriteEnforcer;
276
277fn normalize_row_fields_for_contract_with_mode(
278    db: &crate::storage::unified::devx::RedDB,
279    collection: &str,
280    fields: Vec<(String, Value)>,
281    mode: NormalizeMode,
282) -> RedDBResult<Vec<(String, Value)>> {
283    let Some(contract) = db.collection_contract(collection) else {
284        return Ok(fields);
285    };
286
287    if contract.declared_model != crate::catalog::CollectionModel::Table
288        || (contract.declared_columns.is_empty()
289            && contract
290                .table_def
291                .as_ref()
292                .map(|table| table.columns.is_empty())
293                .unwrap_or(true))
294    {
295        return Ok(fields);
296    }
297
298    // Capture the pre-normalize value of created_at (if present) so
299    // Update mode can preserve it. Also capture updated_at to detect
300    // user attempts to set it via the patch payload.
301    //
302    // Heuristic for Update mode: if fields ALREADY contains a
303    // `created_at` whose value matches the row's on-disk entity, the
304    // caller is the patch pipeline carrying forward an auto-populated
305    // column — not a user mutation. Allow pass-through in that case,
306    // then restore the original value at the end.
307    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
308        fields
309            .iter()
310            .find(|(n, _)| n == "created_at")
311            .map(|(_, v)| v.clone())
312    } else {
313        None
314    };
315
316    // Reject user attempts to set runtime-managed timestamp columns.
317    // On Insert we reject any mention; on Update we only reject when
318    // the patch pipeline handed us a NEW value (not the one we
319    // auto-populated during the last insert).
320    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
321        for (name, _) in &fields {
322            if name == "created_at" || name == "updated_at" {
323                return Err(crate::RedDBError::Query(format!(
324                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
325                    collection, name
326                )));
327            }
328        }
329    }
330
331    let mut provided = std::collections::BTreeMap::new();
332    for (name, value) in &fields {
333        provided.insert(name.clone(), value.clone());
334    }
335
336    let resolved_columns = resolved_contract_columns(&contract)?;
337    let declared_names: std::collections::BTreeSet<String> = resolved_columns
338        .iter()
339        .map(|column| column.name.clone())
340        .collect();
341    let unknown_fields: Vec<String> = fields
342        .iter()
343        .filter(|(name, _)| !declared_names.contains(name))
344        .map(|(name, _)| name.clone())
345        .collect();
346    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
347        && !unknown_fields.is_empty()
348    {
349        return Err(crate::RedDBError::Query(format!(
350            "collection '{}' is strict and does not allow undeclared fields: {}",
351            collection,
352            unknown_fields.join(", ")
353        )));
354    }
355    let mut normalized = Vec::new();
356    let now_ms = current_unix_ms_u64();
357
358    for column in &resolved_columns {
359        match provided.remove(&column.name) {
360            Some(value) => {
361                // Runtime-managed columns on Update: always overwrite
362                // with the runtime's own value (preserved created_at
363                // or fresh updated_at). User mutations are silently
364                // discarded because we reject them earlier.
365                if contract.timestamps_enabled && mode == NormalizeMode::Update {
366                    match column.name.as_str() {
367                        "created_at" => {
368                            normalized.push((
369                                column.name.clone(),
370                                existing_created_at
371                                    .clone()
372                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
373                            ));
374                            continue;
375                        }
376                        "updated_at" => {
377                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
378                            continue;
379                        }
380                        _ => {}
381                    }
382                }
383                normalized.push((
384                    column.name.clone(),
385                    normalize_contract_value(collection, column, value)?,
386                ));
387            }
388            None => {
389                // Runtime-managed timestamp columns: auto-fill with now
390                // when the contract opted in. Both get the same value on
391                // first insert so callers can order by either.
392                if contract.timestamps_enabled
393                    && (column.name == "created_at" || column.name == "updated_at")
394                {
395                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
396                    continue;
397                }
398                if let Some(default) = &column.default {
399                    normalized.push((
400                        column.name.clone(),
401                        coerce_contract_literal(collection, &column.name, column, default)?,
402                    ));
403                } else if column.not_null {
404                    return Err(crate::RedDBError::Query(format!(
405                        "missing required column '{}' for collection '{}'",
406                        column.name, collection
407                    )));
408                }
409            }
410        }
411    }
412
413    for (name, value) in fields {
414        if !declared_names.contains(&name) {
415            normalized.push((name, value));
416        }
417    }
418
419    Ok(normalized)
420}
421
422fn current_unix_ms_u64() -> u64 {
423    std::time::SystemTime::now()
424        .duration_since(std::time::UNIX_EPOCH)
425        .map(|d| d.as_millis() as u64)
426        .unwrap_or(0)
427}
428
429fn enforce_row_uniqueness(
430    db: &crate::storage::unified::devx::RedDB,
431    collection: &str,
432    fields: &[(String, Value)],
433    exclude_id: Option<crate::storage::EntityId>,
434) -> RedDBResult<()> {
435    let Some(contract) = db.collection_contract(collection) else {
436        return Ok(());
437    };
438    if contract.declared_model != crate::catalog::CollectionModel::Table
439        && contract.declared_model != crate::catalog::CollectionModel::Mixed
440    {
441        return Ok(());
442    }
443
444    let rules = resolved_uniqueness_rules(&contract);
445    if rules.is_empty() {
446        return Ok(());
447    }
448
449    let store = db.store();
450    let Some(manager) = store.get_collection(collection) else {
451        return Ok(());
452    };
453
454    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
455
456    for rule in &rules {
457        let mut expected_signatures = Vec::new();
458        let mut skip_rule = false;
459
460        for column in &rule.columns {
461            match input_fields.get(column) {
462                Some(Value::Null) | None if rule.primary_key => {
463                    return Err(crate::RedDBError::Query(format!(
464                        "primary key '{}' in collection '{}' requires non-null column '{}'",
465                        rule.name, collection, column
466                    )))
467                }
468                Some(Value::Null) | None => {
469                    skip_rule = true;
470                    break;
471                }
472                Some(value) => {
473                    expected_signatures.push((column.clone(), value_signature(value)));
474                }
475            }
476        }
477
478        if skip_rule {
479            continue;
480        }
481
482        for entity in manager.query_all(|_| true) {
483            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
484                continue;
485            }
486            let Some(existing_fields) = row_fields_from_entity(&entity) else {
487                continue;
488            };
489
490            let duplicate = expected_signatures.iter().all(|(column, expected)| {
491                existing_fields
492                    .get(column)
493                    .map(|value| value_signature(value) == *expected)
494                    .unwrap_or(false)
495            });
496
497            if duplicate {
498                let qualifier = if rule.primary_key {
499                    "primary key"
500                } else {
501                    "unique constraint"
502                };
503                return Err(crate::RedDBError::Query(format!(
504                    "{} '{}' violated on collection '{}' for columns [{}]",
505                    qualifier,
506                    rule.name,
507                    collection,
508                    rule.columns.join(", ")
509                )));
510            }
511        }
512    }
513
514    Ok(())
515}
516
517fn enforce_row_batch_uniqueness(
518    db: &crate::storage::unified::devx::RedDB,
519    collection: &str,
520    rows: &[Vec<(String, Value)>],
521) -> RedDBResult<()> {
522    let Some(contract) = db.collection_contract(collection) else {
523        return Ok(());
524    };
525    if contract.declared_model != crate::catalog::CollectionModel::Table
526        && contract.declared_model != crate::catalog::CollectionModel::Mixed
527    {
528        return Ok(());
529    }
530
531    let rules = resolved_uniqueness_rules(&contract);
532    if rules.is_empty() {
533        return Ok(());
534    }
535
536    for rule in &rules {
537        let mut seen = std::collections::HashMap::<String, usize>::new();
538        for (row_index, fields) in rows.iter().enumerate() {
539            let input_fields: std::collections::BTreeMap<String, Value> =
540                fields.iter().cloned().collect();
541            let mut signatures = Vec::new();
542            let mut skip_rule = false;
543
544            for column in &rule.columns {
545                match input_fields.get(column) {
546                    Some(Value::Null) | None if rule.primary_key => {
547                        return Err(crate::RedDBError::Query(format!(
548                            "primary key '{}' in collection '{}' requires non-null column '{}'",
549                            rule.name, collection, column
550                        )))
551                    }
552                    Some(Value::Null) | None => {
553                        skip_rule = true;
554                        break;
555                    }
556                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
557                }
558            }
559
560            if skip_rule {
561                continue;
562            }
563
564            let signature = signatures.join("|");
565            if let Some(previous_index) = seen.insert(signature, row_index) {
566                return Err(crate::RedDBError::Query(format!(
567                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
568                    rule.name,
569                    collection,
570                    previous_index + 1,
571                    row_index + 1
572                )));
573            }
574        }
575    }
576
577    Ok(())
578}
579
580fn row_update_requires_uniqueness_check(
581    db: &crate::storage::unified::devx::RedDB,
582    collection: &str,
583    modified_columns: &[String],
584) -> bool {
585    if modified_columns.is_empty() {
586        return false;
587    }
588
589    let Some(contract) = db.collection_contract(collection) else {
590        return false;
591    };
592    if contract.declared_model != crate::catalog::CollectionModel::Table
593        && contract.declared_model != crate::catalog::CollectionModel::Mixed
594    {
595        return false;
596    }
597
598    let rules = resolved_uniqueness_rules(&contract);
599    if rules.is_empty() {
600        return false;
601    }
602
603    rules.iter().any(|rule| {
604        rule.columns.iter().any(|column| {
605            modified_columns
606                .iter()
607                .any(|modified| modified.eq_ignore_ascii_case(column))
608        })
609    })
610}
611
612pub(crate) fn build_row_update_contract_plan(
613    db: &crate::storage::unified::devx::RedDB,
614    collection: &str,
615) -> RedDBResult<Option<RowUpdateContractPlan>> {
616    let Some(contract) = db.collection_contract(collection) else {
617        return Ok(None);
618    };
619
620    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
621        && !(contract.declared_columns.is_empty()
622            && contract
623                .table_def
624                .as_ref()
625                .map(|table| table.columns.is_empty())
626                .unwrap_or(true))
627    {
628        resolved_contract_columns(&contract)?
629            .into_iter()
630            .map(|rule| {
631                (
632                    rule.name.clone(),
633                    RowUpdateColumnRule {
634                        name: rule.name,
635                        data_type: rule.data_type,
636                        data_type_name: rule.data_type_name,
637                        not_null: rule.not_null,
638                        enum_variants: rule.enum_variants,
639                    },
640                )
641            })
642            .collect()
643    } else {
644        HashMap::new()
645    };
646
647    let unique_columns = if matches!(
648        contract.declared_model,
649        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
650    ) {
651        resolved_uniqueness_rules(&contract)
652            .into_iter()
653            .flat_map(|rule| rule.columns.into_iter())
654            .map(|column| (column, ()))
655            .collect()
656    } else {
657        HashMap::new()
658    };
659
660    Ok(Some(RowUpdateContractPlan {
661        timestamps_enabled: contract.timestamps_enabled,
662        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
663        declared_rules,
664        unique_columns,
665    }))
666}
667
668pub(crate) fn normalize_row_update_assignment_with_plan(
669    collection: &str,
670    column: &str,
671    value: Value,
672    row_contract_plan: Option<&RowUpdateContractPlan>,
673) -> RedDBResult<Value> {
674    let Some(plan) = row_contract_plan else {
675        return Ok(value);
676    };
677
678    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
679        return Err(crate::RedDBError::Query(format!(
680            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
681            collection, column
682        )));
683    }
684
685    if let Some(rule) = plan.declared_rules.get(column) {
686        let rule = ResolvedColumnRule {
687            name: rule.name.clone(),
688            data_type: rule.data_type,
689            data_type_name: rule.data_type_name.clone(),
690            not_null: rule.not_null,
691            default: None,
692            enum_variants: rule.enum_variants.clone(),
693        };
694        normalize_contract_value(collection, &rule, value)
695    } else if plan.strict_schema {
696        Err(crate::RedDBError::Query(format!(
697            "collection '{}' is strict and does not allow undeclared fields: {}",
698            collection, column
699        )))
700    } else {
701        Ok(value)
702    }
703}
704
705pub(crate) fn normalize_row_update_value_for_rule(
706    collection: &str,
707    value: Value,
708    row_rule: Option<&RowUpdateColumnRule>,
709) -> RedDBResult<Value> {
710    let Some(rule) = row_rule else {
711        return Ok(value);
712    };
713
714    let rule = ResolvedColumnRule {
715        name: rule.name.clone(),
716        data_type: rule.data_type,
717        data_type_name: rule.data_type_name.clone(),
718        not_null: rule.not_null,
719        default: None,
720        enum_variants: rule.enum_variants.clone(),
721    };
722    normalize_contract_value(collection, &rule, value)
723}
724
725fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
726    if let Some(named) = row.named.as_mut() {
727        named.insert(name.to_string(), value);
728        return;
729    }
730
731    if let Some(schema) = row.schema.as_ref() {
732        if let Some(index) = schema.iter().position(|column| column == name) {
733            if let Some(slot) = row.columns.get_mut(index) {
734                *slot = value;
735                return;
736            }
737        }
738
739        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
740        for (column, current) in schema.iter().zip(row.columns.iter()) {
741            named.insert(column.clone(), current.clone());
742        }
743        named.insert(name.to_string(), value);
744        row.named = Some(named);
745        return;
746    }
747
748    let mut named = HashMap::with_capacity(1);
749    named.insert(name.to_string(), value);
750    row.named = Some(named);
751}
752
753fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
754    if let Some(named) = row.named.as_ref() {
755        named
756            .iter()
757            .map(|(key, value)| (key.clone(), value.clone()))
758            .collect()
759    } else if let Some(schema) = row.schema.as_ref() {
760        schema
761            .iter()
762            .cloned()
763            .zip(row.columns.iter().cloned())
764            .collect()
765    } else {
766        Vec::new()
767    }
768}
769
770fn apply_row_field_assignments_raw<I>(
771    row: &mut crate::storage::unified::entity::RowData,
772    field_assignments: I,
773) where
774    I: IntoIterator<Item = (String, Value)>,
775{
776    for (column, value) in field_assignments {
777        set_row_field(row, &column, value);
778    }
779}
780
781fn apply_row_field_assignments_incremental<I>(
782    collection: &str,
783    row: &mut crate::storage::unified::entity::RowData,
784    field_assignments: I,
785    row_contract_plan: Option<&RowUpdateContractPlan>,
786) -> RedDBResult<()>
787where
788    I: IntoIterator<Item = (String, Value)>,
789{
790    for (column, value) in field_assignments {
791        let value = normalize_row_update_assignment_with_plan(
792            collection,
793            &column,
794            value,
795            row_contract_plan,
796        )?;
797
798        set_row_field(row, &column, value);
799    }
800
801    Ok(())
802}
803
804fn resolved_uniqueness_rules(
805    contract: &crate::physical::CollectionContract,
806) -> Vec<UniquenessRule> {
807    let mut rules = Vec::new();
808
809    if let Some(table_def) = &contract.table_def {
810        if !table_def.primary_key.is_empty() {
811            rules.push(UniquenessRule {
812                name: "primary_key".to_string(),
813                columns: table_def.primary_key.clone(),
814                primary_key: true,
815            });
816        }
817
818        for constraint in &table_def.constraints {
819            if matches!(
820                constraint.constraint_type,
821                crate::storage::schema::ConstraintType::PrimaryKey
822            ) && !constraint.columns.is_empty()
823            {
824                rules.push(UniquenessRule {
825                    name: constraint.name.clone(),
826                    columns: constraint.columns.clone(),
827                    primary_key: true,
828                });
829            } else if matches!(
830                constraint.constraint_type,
831                crate::storage::schema::ConstraintType::Unique
832            ) && !constraint.columns.is_empty()
833            {
834                rules.push(UniquenessRule {
835                    name: constraint.name.clone(),
836                    columns: constraint.columns.clone(),
837                    primary_key: false,
838                });
839            }
840        }
841    } else {
842        for column in &contract.declared_columns {
843            if column.primary_key {
844                rules.push(UniquenessRule {
845                    name: format!("pk_{}", column.name),
846                    columns: vec![column.name.clone()],
847                    primary_key: true,
848                });
849            } else if column.unique {
850                rules.push(UniquenessRule {
851                    name: format!("uniq_{}", column.name),
852                    columns: vec![column.name.clone()],
853                    primary_key: false,
854                });
855            }
856        }
857    }
858
859    let mut dedup = std::collections::BTreeSet::new();
860    rules
861        .into_iter()
862        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
863        .collect()
864}
865
866fn row_fields_from_entity(
867    entity: &crate::storage::UnifiedEntity,
868) -> Option<std::collections::BTreeMap<String, Value>> {
869    match &entity.data {
870        crate::storage::EntityData::Row(row) => {
871            if let Some(named) = &row.named {
872                Some(
873                    named
874                        .iter()
875                        .map(|(key, value)| (key.clone(), value.clone()))
876                        .collect(),
877                )
878            } else {
879                row.schema.as_ref().map(|schema| {
880                    schema
881                        .iter()
882                        .cloned()
883                        .zip(row.columns.iter().cloned())
884                        .collect()
885                })
886            }
887        }
888        _ => None,
889    }
890}
891
892fn value_signature(value: &Value) -> String {
893    format!("{value:?}")
894}
895
896fn normalize_contract_value(
897    collection: &str,
898    column: &ResolvedColumnRule,
899    value: Value,
900) -> RedDBResult<Value> {
901    if matches!(value, Value::Null) {
902        if column.not_null {
903            return Err(crate::RedDBError::Query(format!(
904                "column '{}' in collection '{}' cannot be null",
905                column.name, collection
906            )));
907        }
908        return Ok(Value::Null);
909    }
910
911    let target = column.data_type;
912    if value_matches_declared_type(&value, target) {
913        return Ok(value);
914    }
915
916    let Some(raw) = value_to_coercion_input(&value) else {
917        return Err(crate::RedDBError::Query(format!(
918            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
919            column.name, collection, column.data_type_name
920        )));
921    };
922
923    coerce_contract_literal(collection, &column.name, column, &raw)
924}
925
926fn coerce_contract_literal(
927    collection: &str,
928    column_name: &str,
929    column: &ResolvedColumnRule,
930    raw: &str,
931) -> RedDBResult<Value> {
932    let target = column.data_type;
933    match target {
934        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
935        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
936        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
937            crate::RedDBError::Query(format!(
938                "failed to coerce column '{}' in collection '{}' to '{}': {}",
939                column_name, collection, column.data_type_name, err
940            ))
941        }),
942        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
943            crate::RedDBError::Query(format!(
944                "failed to coerce column '{}' in collection '{}' to '{}': {}",
945                column_name, collection, column.data_type_name, err
946            ))
947        }),
948        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
949            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
950            column_name, collection, column.data_type_name
951        ))),
952        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
953            |err| {
954                crate::RedDBError::Query(format!(
955                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
956                    column_name, collection, column.data_type_name, err
957                ))
958            },
959        ),
960    }
961}
962
963struct ResolvedColumnRule {
964    name: String,
965    data_type: DataType,
966    data_type_name: String,
967    not_null: bool,
968    default: Option<String>,
969    enum_variants: Vec<String>,
970}
971
972fn resolved_contract_columns(
973    contract: &crate::physical::CollectionContract,
974) -> RedDBResult<Vec<ResolvedColumnRule>> {
975    if let Some(table_def) = &contract.table_def {
976        return Ok(table_def
977            .columns
978            .iter()
979            .map(|column| ResolvedColumnRule {
980                name: column.name.clone(),
981                data_type: column.data_type,
982                data_type_name: data_type_name(column.data_type).to_string(),
983                not_null: !column.nullable,
984                default: column
985                    .default
986                    .as_ref()
987                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
988                enum_variants: column.enum_variants.clone(),
989            })
990            .collect());
991    }
992
993    contract
994        .declared_columns
995        .iter()
996        .map(|column| {
997            let data_type = column
998                .sql_type
999                .as_ref()
1000                .map(crate::storage::query::resolve_sql_type_name)
1001                .transpose()
1002                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1003                .unwrap_or(parse_declared_data_type(&column.data_type)?);
1004            Ok(ResolvedColumnRule {
1005                name: column.name.clone(),
1006                data_type,
1007                data_type_name: column.data_type.clone(),
1008                not_null: column.not_null,
1009                default: column.default.clone(),
1010                enum_variants: column.enum_variants.clone(),
1011            })
1012        })
1013        .collect()
1014}
1015
1016fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1017    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1018}
1019
1020fn data_type_name(data_type: DataType) -> &'static str {
1021    match data_type {
1022        DataType::Integer => "integer",
1023        DataType::UnsignedInteger => "unsigned_integer",
1024        DataType::Float => "float",
1025        DataType::Text => "text",
1026        DataType::Blob => "blob",
1027        DataType::Boolean => "boolean",
1028        DataType::Timestamp => "timestamp",
1029        DataType::Duration => "duration",
1030        DataType::IpAddr => "ipaddr",
1031        DataType::MacAddr => "macaddr",
1032        DataType::Vector => "vector",
1033        DataType::Nullable => "nullable",
1034        DataType::Unknown => "unknown",
1035        DataType::Json => "json",
1036        DataType::Uuid => "uuid",
1037        DataType::NodeRef => "noderef",
1038        DataType::EdgeRef => "edgeref",
1039        DataType::VectorRef => "vectorref",
1040        DataType::RowRef => "rowref",
1041        DataType::Color => "color",
1042        DataType::Email => "email",
1043        DataType::Url => "url",
1044        DataType::Phone => "phone",
1045        DataType::Semver => "semver",
1046        DataType::Cidr => "cidr",
1047        DataType::Date => "date",
1048        DataType::Time => "time",
1049        DataType::Decimal => "decimal",
1050        DataType::Enum => "enum",
1051        DataType::Array => "array",
1052        DataType::TimestampMs => "timestamp_ms",
1053        DataType::Ipv4 => "ipv4",
1054        DataType::Ipv6 => "ipv6",
1055        DataType::Subnet => "subnet",
1056        DataType::Port => "port",
1057        DataType::Latitude => "latitude",
1058        DataType::Longitude => "longitude",
1059        DataType::GeoPoint => "geopoint",
1060        DataType::Country2 => "country2",
1061        DataType::Country3 => "country3",
1062        DataType::Lang2 => "lang2",
1063        DataType::Lang5 => "lang5",
1064        DataType::Currency => "currency",
1065        DataType::AssetCode => "asset_code",
1066        DataType::Money => "money",
1067        DataType::ColorAlpha => "color_alpha",
1068        DataType::BigInt => "bigint",
1069        DataType::KeyRef => "keyref",
1070        DataType::DocRef => "docref",
1071        DataType::TableRef => "tableref",
1072        DataType::PageRef => "pageref",
1073        DataType::Secret => "secret",
1074        DataType::Password => "password",
1075        DataType::TextZstd => "text",
1076        DataType::BlobZstd => "blob",
1077    }
1078}
1079
1080fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1081    matches!(
1082        (value, target),
1083        (Value::Null, _)
1084            | (Value::Integer(_), DataType::Integer)
1085            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1086            | (Value::Float(_), DataType::Float)
1087            | (Value::Text(_), DataType::Text)
1088            | (Value::Blob(_), DataType::Blob)
1089            | (Value::Boolean(_), DataType::Boolean)
1090            | (Value::Timestamp(_), DataType::Timestamp)
1091            | (Value::Duration(_), DataType::Duration)
1092            | (Value::IpAddr(_), DataType::IpAddr)
1093            | (Value::MacAddr(_), DataType::MacAddr)
1094            | (Value::Vector(_), DataType::Vector)
1095            | (Value::Json(_), DataType::Json)
1096            | (Value::Uuid(_), DataType::Uuid)
1097            | (Value::NodeRef(_), DataType::NodeRef)
1098            | (Value::EdgeRef(_), DataType::EdgeRef)
1099            | (Value::VectorRef(_, _), DataType::VectorRef)
1100            | (Value::RowRef(_, _), DataType::RowRef)
1101            | (Value::Color(_), DataType::Color)
1102            | (Value::Email(_), DataType::Email)
1103            | (Value::Url(_), DataType::Url)
1104            | (Value::Phone(_), DataType::Phone)
1105            | (Value::Semver(_), DataType::Semver)
1106            | (Value::Cidr(_, _), DataType::Cidr)
1107            | (Value::Date(_), DataType::Date)
1108            | (Value::Time(_), DataType::Time)
1109            | (Value::Decimal(_), DataType::Decimal)
1110            | (Value::EnumValue(_), DataType::Enum)
1111            | (Value::Array(_), DataType::Array)
1112            | (Value::TimestampMs(_), DataType::TimestampMs)
1113            | (Value::Ipv4(_), DataType::Ipv4)
1114            | (Value::Ipv6(_), DataType::Ipv6)
1115            | (Value::Subnet(_, _), DataType::Subnet)
1116            | (Value::Port(_), DataType::Port)
1117            | (Value::Latitude(_), DataType::Latitude)
1118            | (Value::Longitude(_), DataType::Longitude)
1119            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1120            | (Value::Country2(_), DataType::Country2)
1121            | (Value::Country3(_), DataType::Country3)
1122            | (Value::Lang2(_), DataType::Lang2)
1123            | (Value::Lang5(_), DataType::Lang5)
1124            | (Value::Currency(_), DataType::Currency)
1125            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1126            | (Value::BigInt(_), DataType::BigInt)
1127            | (Value::KeyRef(_, _), DataType::KeyRef)
1128            | (Value::DocRef(_, _), DataType::DocRef)
1129            | (Value::TableRef(_), DataType::TableRef)
1130            | (Value::PageRef(_), DataType::PageRef)
1131            | (Value::Secret(_), DataType::Secret)
1132            | (Value::Password(_), DataType::Password)
1133    )
1134}
1135
1136fn value_to_coercion_input(value: &Value) -> Option<String> {
1137    match value {
1138        Value::Null => None,
1139        Value::Integer(value) => Some(value.to_string()),
1140        Value::UnsignedInteger(value) => Some(value.to_string()),
1141        Value::Float(value) => Some(value.to_string()),
1142        Value::Text(value) => Some(value.to_string()),
1143        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1144        Value::Boolean(value) => Some(value.to_string()),
1145        Value::Timestamp(value) => Some(value.to_string()),
1146        Value::Duration(value) => Some(value.to_string()),
1147        Value::IpAddr(value) => Some(value.to_string()),
1148        Value::MacAddr(value) => Some(format!(
1149            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1150            value[0], value[1], value[2], value[3], value[4], value[5]
1151        )),
1152        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1153        Value::Email(value) => Some(value.clone()),
1154        Value::Url(value) => Some(value.clone()),
1155        Value::Phone(value) => Some(value.to_string()),
1156        Value::Semver(value) => Some(format!(
1157            "{}.{}.{}",
1158            value / 1_000_000,
1159            (value / 1_000) % 1_000,
1160            value % 1_000
1161        )),
1162        Value::Date(value) => Some(value.to_string()),
1163        Value::Time(value) => Some(value.to_string()),
1164        Value::Decimal(value) => Some(value.to_string()),
1165        Value::TimestampMs(value) => Some(value.to_string()),
1166        Value::Ipv4(value) => Some(format!(
1167            "{}.{}.{}.{}",
1168            (value >> 24) & 0xFF,
1169            (value >> 16) & 0xFF,
1170            (value >> 8) & 0xFF,
1171            value & 0xFF
1172        )),
1173        Value::Port(value) => Some(value.to_string()),
1174        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1175        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1176        Value::GeoPoint(lat, lon) => Some(format!(
1177            "{},{}",
1178            *lat as f64 / 1_000_000.0,
1179            *lon as f64 / 1_000_000.0
1180        )),
1181        Value::BigInt(value) => Some(value.to_string()),
1182        Value::TableRef(value) => Some(value.clone()),
1183        Value::PageRef(value) => Some(value.to_string()),
1184        Value::Password(value) => Some(value.clone()),
1185        _ => None,
1186    }
1187}
1188
1189fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1190    if modified_columns.is_empty() {
1191        return modified_columns;
1192    }
1193
1194    let mut unique = Vec::with_capacity(modified_columns.len());
1195    for column in modified_columns.drain(..) {
1196        if !unique
1197            .iter()
1198            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1199        {
1200            unique.push(column);
1201        }
1202    }
1203    unique
1204}
1205
1206fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1207    if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1208        return Err(crate::RedDBError::Query(
1209            "array positional document patch paths are unsupported; replace the array or full document body instead"
1210                .to_string(),
1211        ));
1212    }
1213    Ok(())
1214}
1215
1216fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1217    match fields.get("body") {
1218        Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1219            crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1220        }),
1221        Some(_) => Err(crate::RedDBError::Query(
1222            "document body field must contain JSON".to_string(),
1223        )),
1224        None => Ok(JsonValue::Object(Default::default())),
1225    }
1226}
1227
1228fn replace_document_row_body(
1229    fields: &mut HashMap<String, Value>,
1230    body: JsonValue,
1231    modified_columns: &mut Vec<String>,
1232) -> RedDBResult<()> {
1233    modified_columns.push("body".to_string());
1234
1235    let old_keys: Vec<String> = fields
1236        .keys()
1237        .filter(|key| key.as_str() != "body")
1238        .cloned()
1239        .collect();
1240    for key in old_keys {
1241        fields.remove(&key);
1242        modified_columns.push(key);
1243    }
1244
1245    let body_bytes = json_to_vec(&body).map_err(|err| {
1246        crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1247    })?;
1248    fields.insert("body".to_string(), Value::Json(body_bytes));
1249
1250    if let JsonValue::Object(map) = &body {
1251        for (key, value) in map {
1252            fields.insert(key.clone(), json_to_storage_value(value)?);
1253            modified_columns.push(key.clone());
1254        }
1255    }
1256
1257    Ok(())
1258}
1259
1260impl RedDBRuntime {
1261    pub(crate) fn apply_loaded_patch_entity_core(
1262        &self,
1263        collection: String,
1264        mut entity: crate::storage::UnifiedEntity,
1265        payload: JsonValue,
1266        operations: Vec<PatchEntityOperation>,
1267    ) -> RedDBResult<AppliedEntityMutation> {
1268        let id = entity.id;
1269        let operations = normalize_ttl_patch_operations(operations)?;
1270        // Snapshot pre-patch row fields for the secondary-index hook —
1271        // empty for non-row entities, which is the desired no-op.
1272        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1273
1274        let db = self.db();
1275        let store = db.store();
1276        let Some(manager) = store.get_collection(&collection) else {
1277            return Err(crate::RedDBError::NotFound(format!(
1278                "collection not found: {collection}"
1279            )));
1280        };
1281
1282        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1283        let mut metadata_changed = false;
1284        let mut modified_columns: Vec<String> = Vec::new();
1285        let mut context_index_dirty = false;
1286        let mut graph_node_type: Option<String> = None;
1287        let mut graph_edge_weight: Option<f32> = None;
1288
1289        let row_contract_timestamps = db
1290            .collection_contract(&collection)
1291            .map(|c| c.timestamps_enabled)
1292            .unwrap_or(false);
1293
1294        match &mut entity.data {
1295            crate::storage::EntityData::Row(row) => {
1296                let is_document_collection = db
1297                    .collection_contract(&collection)
1298                    .map(|contract| {
1299                        contract.declared_model == crate::catalog::CollectionModel::Document
1300                    })
1301                    .unwrap_or(false);
1302                let mut field_ops = Vec::new();
1303                let mut metadata_ops = Vec::new();
1304                let mut document_body_ops = Vec::new();
1305
1306                for mut op in operations {
1307                    let Some(root) = op.path.first().map(String::as_str) else {
1308                        return Err(crate::RedDBError::Query(
1309                            "patch path cannot be empty".to_string(),
1310                        ));
1311                    };
1312
1313                    match root {
1314                        "body" if is_document_collection => {
1315                            if op.path.len() < 2 {
1316                                return Err(crate::RedDBError::Query(
1317                                    "document body patch paths require a nested key; use payload.body for full replacement"
1318                                        .to_string(),
1319                                ));
1320                            }
1321                            op.path.remove(0);
1322                            reject_document_array_position_path(&op.path)?;
1323                            document_body_ops.push(op);
1324                        }
1325                        "fields" | "named" => {
1326                            if op.path.len() < 2 {
1327                                return Err(crate::RedDBError::Query(
1328                                    "patch path 'fields' requires a nested key".to_string(),
1329                                ));
1330                            }
1331                            if row_contract_timestamps {
1332                                let leaf = op.path.get(1).map(String::as_str);
1333                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1334                                    return Err(crate::RedDBError::Query(format!(
1335                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1336                                        collection,
1337                                        leaf.unwrap_or("")
1338                                    )));
1339                                }
1340                            }
1341                            op.path.remove(0);
1342                            field_ops.push(op);
1343                        }
1344                        "metadata" => {
1345                            if op.path.len() < 2 {
1346                                return Err(crate::RedDBError::Query(
1347                                    "patch path 'metadata' requires a nested key".to_string(),
1348                                ));
1349                            }
1350                            op.path.remove(0);
1351                            metadata_ops.push(op);
1352                        }
1353                        _ => {
1354                            return Err(crate::RedDBError::Query(format!(
1355                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1356                            )));
1357                        }
1358                    }
1359                }
1360
1361                if !document_body_ops.is_empty() {
1362                    context_index_dirty = true;
1363                    let named = row.named.get_or_insert_with(Default::default);
1364                    let mut body = document_body_from_named(named)?;
1365                    apply_patch_operations_to_json(&mut body, &document_body_ops)
1366                        .map_err(crate::RedDBError::Query)?;
1367                    replace_document_row_body(named, body, &mut modified_columns)?;
1368                }
1369
1370                if is_document_collection {
1371                    if let Some(body) = payload.get("body") {
1372                        context_index_dirty = true;
1373                        let named = row.named.get_or_insert_with(Default::default);
1374                        replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1375                    }
1376                }
1377
1378                if !field_ops.is_empty() {
1379                    context_index_dirty = true;
1380                    for op in &field_ops {
1381                        if let Some(col) = op.path.first() {
1382                            modified_columns.push(col.clone());
1383                        }
1384                    }
1385                    let named = row.named.get_or_insert_with(Default::default);
1386                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1387                }
1388
1389                if let Some(fields) = payload
1390                    .get("fields")
1391                    .and_then(crate::json::Value::as_object)
1392                {
1393                    if row_contract_timestamps {
1394                        for key in fields.keys() {
1395                            if key == "created_at" || key == "updated_at" {
1396                                return Err(crate::RedDBError::Query(format!(
1397                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1398                                    collection, key
1399                                )));
1400                            }
1401                        }
1402                    }
1403                    context_index_dirty = true;
1404                    let named = row.named.get_or_insert_with(Default::default);
1405                    for (key, value) in fields {
1406                        modified_columns.push(key.clone());
1407                        named.insert(key.clone(), json_to_storage_value(value)?);
1408                    }
1409                }
1410
1411                if !metadata_ops.is_empty() {
1412                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1413                    let metadata = patch_metadata.get_or_insert_with(|| {
1414                        store.get_metadata(&collection, id).unwrap_or_default()
1415                    });
1416                    let mut metadata_json = metadata_to_json(metadata);
1417                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1418                        .map_err(crate::RedDBError::Query)?;
1419                    *metadata = metadata_from_json(&metadata_json)?;
1420                    metadata_changed = true;
1421                }
1422
1423                if !modified_columns.is_empty() || row_contract_timestamps {
1424                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1425                    let current_fields = if let Some(named) = row.named.take() {
1426                        named.into_iter().collect::<Vec<_>>()
1427                    } else if let Some(schema) = row.schema.as_ref() {
1428                        schema
1429                            .iter()
1430                            .cloned()
1431                            .zip(row.columns.iter().cloned())
1432                            .collect::<Vec<_>>()
1433                    } else {
1434                        Vec::new()
1435                    };
1436                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1437                    if row_contract_timestamps {
1438                        modified_columns.push("updated_at".to_string());
1439                        context_index_dirty = true;
1440                    }
1441                    if contract.requires_uniqueness_check(&modified_columns) {
1442                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1443                    }
1444                    row.named = Some(normalized_fields.into_iter().collect());
1445                }
1446            }
1447            crate::storage::EntityData::Node(node) => {
1448                let mut field_ops = Vec::new();
1449                let mut metadata_ops = Vec::new();
1450                let mut node_type_ops = Vec::new();
1451
1452                for mut op in operations {
1453                    let Some(root) = op.path.first().map(String::as_str) else {
1454                        return Err(crate::RedDBError::Query(
1455                            "patch path cannot be empty".to_string(),
1456                        ));
1457                    };
1458
1459                    match root {
1460                        "fields" | "properties" => {
1461                            if op.path.len() < 2 {
1462                                return Err(crate::RedDBError::Query(
1463                                    "patch path 'fields' requires a nested key".to_string(),
1464                                ));
1465                            }
1466                            op.path.remove(0);
1467                            field_ops.push(op);
1468                        }
1469                        "metadata" => {
1470                            if op.path.len() < 2 {
1471                                return Err(crate::RedDBError::Query(
1472                                    "patch path 'metadata' requires a nested key".to_string(),
1473                                ));
1474                            }
1475                            op.path.remove(0);
1476                            metadata_ops.push(op);
1477                        }
1478                        "node_type" => {
1479                            if op.path.len() != 1 {
1480                                return Err(crate::RedDBError::Query(
1481                                    "patch path 'node_type' does not allow nested keys".to_string(),
1482                                ));
1483                            }
1484                            op.path.clear();
1485                            node_type_ops.push(op);
1486                        }
1487                        _ => {
1488                            return Err(crate::RedDBError::Query(format!(
1489                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1490                            )));
1491                        }
1492                    }
1493                }
1494
1495                for op in node_type_ops {
1496                    context_index_dirty = true;
1497                    let value = op.value.ok_or_else(|| {
1498                        crate::RedDBError::Query("node_type operations require a value".to_string())
1499                    })?;
1500
1501                    match op.op {
1502                        PatchEntityOperationType::Unset => {
1503                            return Err(crate::RedDBError::Query(
1504                                "node_type cannot be unset through patch operations".to_string(),
1505                            ));
1506                        }
1507                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1508                            let Some(node_type) = value.as_str() else {
1509                                return Err(crate::RedDBError::Query(
1510                                    "node_type operation requires a text value".to_string(),
1511                                ));
1512                            };
1513                            graph_node_type = Some(node_type.to_string());
1514                            modified_columns.push("node_type".to_string());
1515                        }
1516                    }
1517                }
1518
1519                if !field_ops.is_empty() {
1520                    context_index_dirty = true;
1521                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1522                }
1523
1524                if let Some(fields) = payload
1525                    .get("fields")
1526                    .and_then(crate::json::Value::as_object)
1527                {
1528                    context_index_dirty = true;
1529                    for (key, value) in fields {
1530                        node.properties
1531                            .insert(key.clone(), json_to_storage_value(value)?);
1532                    }
1533                }
1534
1535                if !metadata_ops.is_empty() {
1536                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1537                    let metadata = patch_metadata.get_or_insert_with(|| {
1538                        store.get_metadata(&collection, id).unwrap_or_default()
1539                    });
1540                    let mut metadata_json = metadata_to_json(metadata);
1541                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1542                        .map_err(crate::RedDBError::Query)?;
1543                    *metadata = metadata_from_json(&metadata_json)?;
1544                    metadata_changed = true;
1545                }
1546            }
1547            crate::storage::EntityData::Edge(edge) => {
1548                let mut field_ops = Vec::new();
1549                let mut metadata_ops = Vec::new();
1550                let mut weight_ops = Vec::new();
1551
1552                for mut op in operations {
1553                    let Some(root) = op.path.first().map(String::as_str) else {
1554                        return Err(crate::RedDBError::Query(
1555                            "patch path cannot be empty".to_string(),
1556                        ));
1557                    };
1558
1559                    match root {
1560                        "fields" | "properties" => {
1561                            if op.path.len() < 2 {
1562                                return Err(crate::RedDBError::Query(
1563                                    "patch path 'fields' requires a nested key".to_string(),
1564                                ));
1565                            }
1566                            op.path.remove(0);
1567                            field_ops.push(op);
1568                        }
1569                        "weight" => {
1570                            if op.path.len() != 1 {
1571                                return Err(crate::RedDBError::Query(
1572                                    "patch path 'weight' does not allow nested keys".to_string(),
1573                                ));
1574                            }
1575                            op.path.clear();
1576                            weight_ops.push(op);
1577                        }
1578                        "metadata" => {
1579                            if op.path.len() < 2 {
1580                                return Err(crate::RedDBError::Query(
1581                                    "patch path 'metadata' requires a nested key".to_string(),
1582                                ));
1583                            }
1584                            op.path.remove(0);
1585                            metadata_ops.push(op);
1586                        }
1587                        _ => {
1588                            return Err(crate::RedDBError::Query(format!(
1589                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1590                            )));
1591                        }
1592                    }
1593                }
1594
1595                if !field_ops.is_empty() {
1596                    context_index_dirty = true;
1597                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1598                }
1599
1600                for op in weight_ops {
1601                    context_index_dirty = true;
1602                    let value = op.value.ok_or_else(|| {
1603                        crate::RedDBError::Query("weight operations require a value".to_string())
1604                    })?;
1605
1606                    match op.op {
1607                        PatchEntityOperationType::Unset => {
1608                            return Err(crate::RedDBError::Query(
1609                                "weight cannot be unset through patch operations".to_string(),
1610                            ));
1611                        }
1612                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1613                            let Some(weight) = value.as_f64() else {
1614                                return Err(crate::RedDBError::Query(
1615                                    "weight operation requires a numeric value".to_string(),
1616                                ));
1617                            };
1618                            graph_edge_weight = Some(weight as f32);
1619                            edge.weight = weight as f32;
1620                            modified_columns.push("weight".to_string());
1621                        }
1622                    }
1623                }
1624
1625                if let Some(fields) = payload
1626                    .get("fields")
1627                    .and_then(crate::json::Value::as_object)
1628                {
1629                    context_index_dirty = true;
1630                    for (key, value) in fields {
1631                        edge.properties
1632                            .insert(key.clone(), json_to_storage_value(value)?);
1633                    }
1634                }
1635
1636                if !metadata_ops.is_empty() {
1637                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1638                    let metadata = patch_metadata.get_or_insert_with(|| {
1639                        store.get_metadata(&collection, id).unwrap_or_default()
1640                    });
1641                    let mut metadata_json = metadata_to_json(metadata);
1642                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1643                        .map_err(crate::RedDBError::Query)?;
1644                    *metadata = metadata_from_json(&metadata_json)?;
1645                    metadata_changed = true;
1646                }
1647            }
1648            crate::storage::EntityData::Vector(vector) => {
1649                let mut field_ops = Vec::new();
1650                let mut metadata_ops = Vec::new();
1651
1652                for mut op in operations {
1653                    let Some(root) = op.path.first().map(String::as_str) else {
1654                        return Err(crate::RedDBError::Query(
1655                            "patch path cannot be empty".to_string(),
1656                        ));
1657                    };
1658
1659                    match root {
1660                        "fields" => {
1661                            if op.path.len() < 2 {
1662                                return Err(crate::RedDBError::Query(
1663                                    "patch path 'fields' requires a nested key".to_string(),
1664                                ));
1665                            }
1666                            op.path.remove(0);
1667                            let Some(target) = op.path.first().map(String::as_str) else {
1668                                return Err(crate::RedDBError::Query(
1669                                    "patch path requires a target under fields".to_string(),
1670                                ));
1671                            };
1672                            if !matches!(target, "dense" | "content" | "sparse") {
1673                                return Err(crate::RedDBError::Query(format!(
1674                                    "unsupported vector patch target '{target}'"
1675                                )));
1676                            }
1677                            field_ops.push(op);
1678                        }
1679                        "metadata" => {
1680                            if op.path.len() < 2 {
1681                                return Err(crate::RedDBError::Query(
1682                                    "patch path 'metadata' requires a nested key".to_string(),
1683                                ));
1684                            }
1685                            op.path.remove(0);
1686                            metadata_ops.push(op);
1687                        }
1688                        _ => {
1689                            return Err(crate::RedDBError::Query(format!(
1690                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1691                            )));
1692                        }
1693                    }
1694                }
1695
1696                if !field_ops.is_empty() {
1697                    context_index_dirty = true;
1698                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1699                }
1700
1701                if let Some(fields) = payload
1702                    .get("fields")
1703                    .and_then(crate::json::Value::as_object)
1704                {
1705                    context_index_dirty = true;
1706                    if let Some(content) =
1707                        fields.get("content").and_then(crate::json::Value::as_str)
1708                    {
1709                        vector.content = Some(content.to_string());
1710                    }
1711                    if let Some(dense) = fields.get("dense") {
1712                        vector.dense = dense
1713                            .as_array()
1714                            .ok_or_else(|| {
1715                                crate::RedDBError::Query(
1716                                    "field 'dense' must be an array".to_string(),
1717                                )
1718                            })?
1719                            .iter()
1720                            .map(|value| {
1721                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1722                                    crate::RedDBError::Query(
1723                                        "field 'dense' must contain only numbers".to_string(),
1724                                    )
1725                                })
1726                            })
1727                            .collect::<Result<Vec<_>, _>>()?;
1728                    }
1729                }
1730
1731                if !metadata_ops.is_empty() {
1732                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1733                    let metadata = patch_metadata.get_or_insert_with(|| {
1734                        store.get_metadata(&collection, id).unwrap_or_default()
1735                    });
1736                    let mut metadata_json = metadata_to_json(metadata);
1737                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1738                        .map_err(crate::RedDBError::Query)?;
1739                    *metadata = metadata_from_json(&metadata_json)?;
1740                    metadata_changed = true;
1741                }
1742            }
1743            crate::storage::EntityData::TimeSeries(_)
1744            | crate::storage::EntityData::QueueMessage(_) => {
1745                return Err(crate::RedDBError::Query(
1746                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1747                        .to_string(),
1748                ));
1749            }
1750        }
1751
1752        if let Some(node_type) = graph_node_type {
1753            if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1754                node.node_type = node_type;
1755            }
1756        }
1757        if let Some(weight) = graph_edge_weight {
1758            if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1759                edge.weight = (weight * 1000.0) as u32;
1760            }
1761        }
1762
1763        if let Some(metadata) = payload
1764            .get("metadata")
1765            .and_then(crate::json::Value::as_object)
1766        {
1767            let patch_metadata = patch_metadata
1768                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1769            for (key, value) in metadata {
1770                ensure_non_tree_reserved_metadata_key(key)?;
1771                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1772            }
1773            metadata_changed = true;
1774        }
1775
1776        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1777            let patch_metadata = patch_metadata
1778                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1779            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1780                patch_metadata.remove(&key);
1781            } else {
1782                patch_metadata.set(key, value);
1783            }
1784            metadata_changed = true;
1785        }
1786
1787        entity.updated_at = std::time::SystemTime::now()
1788            .duration_since(std::time::UNIX_EPOCH)
1789            .unwrap_or_default()
1790            .as_secs();
1791
1792        modified_columns = dedupe_modified_columns(modified_columns);
1793
1794        Ok(AppliedEntityMutation {
1795            id,
1796            collection,
1797            entity,
1798            metadata: patch_metadata,
1799            modified_columns,
1800            persist_metadata: metadata_changed,
1801            context_index_dirty,
1802            replaced_entity: None,
1803            replaced_entity_previous_xmax: 0,
1804            pre_mutation_fields,
1805        })
1806    }
1807
1808    pub(crate) fn apply_loaded_sql_update_row_core(
1809        &self,
1810        collection: String,
1811        mut entity: crate::storage::UnifiedEntity,
1812        static_field_assignments: &[(String, Value)],
1813        dynamic_field_assignments: Vec<(String, Value)>,
1814        static_metadata_assignments: &[(String, MetadataValue)],
1815        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1816        row_contract_plan: Option<&RowUpdateContractPlan>,
1817        row_modified_columns_template: &[String],
1818        row_touches_unique_columns: bool,
1819    ) -> RedDBResult<AppliedEntityMutation> {
1820        let id = entity.id;
1821        let previous_xmax = entity.xmax;
1822        let db = self.db();
1823        let store = db.store();
1824        let Some(_) = store.get_collection(&collection) else {
1825            return Err(crate::RedDBError::NotFound(format!(
1826                "collection not found: {collection}"
1827            )));
1828        };
1829
1830        let versioned_update_xid = match self.current_xid() {
1831            Some(xid) => Some(xid),
1832            None => {
1833                let snapshot_manager = self.snapshot_manager();
1834                let xid = snapshot_manager.begin();
1835                snapshot_manager.commit(xid);
1836                Some(xid)
1837            }
1838        };
1839        let mut replaced_entity = versioned_update_xid.map(|xid| {
1840            let mut old = entity.clone();
1841            old.set_xmax(xid);
1842            old
1843        });
1844
1845        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1846        let row_contract_timestamps = row_contract_plan
1847            .map(|plan| plan.timestamps_enabled)
1848            .unwrap_or(false);
1849        let mut metadata_changed = false;
1850        let mut modified_columns = row_modified_columns_template.to_vec();
1851        let mut context_index_dirty = !modified_columns.is_empty();
1852
1853        // Snapshot OLD field values BEFORE applying the assignments —
1854        // the secondary-index maintenance hook needs both before/after to
1855        // delete-then-insert under changed indexed columns.
1856        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1857
1858        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1859            return Err(crate::RedDBError::Query(
1860                "SQL row update fast path requires a row entity".to_string(),
1861            ));
1862        };
1863
1864        let _ = row_contract_plan;
1865        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1866        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1867
1868        for (key, value) in static_metadata_assignments
1869            .iter()
1870            .cloned()
1871            .chain(dynamic_metadata_assignments)
1872        {
1873            ensure_non_tree_reserved_metadata_key(&key)?;
1874            patch_metadata
1875                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1876                .set(key, value);
1877            metadata_changed = true;
1878        }
1879
1880        if !modified_columns.is_empty() || row_contract_timestamps {
1881            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1882            if row_contract_timestamps {
1883                context_index_dirty = true;
1884                set_row_field(
1885                    row,
1886                    "updated_at",
1887                    Value::UnsignedInteger(current_unix_ms_u64()),
1888                );
1889                modified_columns.push("updated_at".to_string());
1890            }
1891            if row_touches_unique_columns {
1892                let current_fields = collect_row_fields(row);
1893                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1894            }
1895        }
1896
1897        entity.updated_at = std::time::SystemTime::now()
1898            .duration_since(std::time::UNIX_EPOCH)
1899            .unwrap_or_default()
1900            .as_secs();
1901
1902        if let Some(xid) = versioned_update_xid {
1903            let logical_id = entity.logical_id();
1904            entity.id = store.next_entity_id();
1905            entity.set_logical_id(logical_id);
1906            entity.set_xmin(xid);
1907            entity.set_xmax(0);
1908            if let Some(old) = replaced_entity.as_mut() {
1909                old.set_xmax(xid);
1910            }
1911        }
1912
1913        modified_columns = dedupe_modified_columns(modified_columns);
1914
1915        Ok(AppliedEntityMutation {
1916            id: entity.id,
1917            collection,
1918            entity,
1919            metadata: patch_metadata,
1920            modified_columns,
1921            persist_metadata: metadata_changed,
1922            context_index_dirty,
1923            replaced_entity,
1924            replaced_entity_previous_xmax: previous_xmax,
1925            pre_mutation_fields,
1926        })
1927    }
1928
1929    pub(crate) fn persist_applied_entity_mutations(
1930        &self,
1931        applied: &[AppliedEntityMutation],
1932    ) -> RedDBResult<()> {
1933        if applied.is_empty() {
1934            return Ok(());
1935        }
1936
1937        let store = self.db().store();
1938        let collection = &applied[0].collection;
1939        let Some(manager) = store.get_collection(collection) else {
1940            return Err(crate::RedDBError::NotFound(format!(
1941                "collection not found: {collection}"
1942            )));
1943        };
1944
1945        let mut ordinary = Vec::with_capacity(applied.len());
1946        for item in applied {
1947            if let Some(old_version) = item.replaced_entity.as_ref() {
1948                store
1949                    .install_versioned_table_row_update(
1950                        collection,
1951                        old_version.clone(),
1952                        item.entity.clone(),
1953                        if item.persist_metadata {
1954                            item.metadata.as_ref()
1955                        } else {
1956                            None
1957                        },
1958                    )
1959                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1960                if self.current_xid().is_some() {
1961                    self.record_pending_versioned_update(
1962                        crate::runtime::impl_core::current_connection_id(),
1963                        collection,
1964                        old_version.id,
1965                        item.entity.id,
1966                        old_version.xmax,
1967                        item.replaced_entity_previous_xmax,
1968                    );
1969                }
1970            } else {
1971                ordinary.push(item);
1972            }
1973        }
1974        if ordinary.is_empty() {
1975            return Ok(());
1976        }
1977
1978        manager
1979            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1980                (
1981                    &item.entity,
1982                    item.modified_columns.as_slice(),
1983                    if item.persist_metadata {
1984                        item.metadata.as_ref()
1985                    } else {
1986                        None
1987                    },
1988                )
1989            }))
1990            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1991
1992        // PG-HOT-like fast path: segment in-place is done; when no
1993        // mutation touches a secondary-indexed column AND no metadata
1994        // payload needs to be folded into a B-tree record, skip the
1995        // in-line B-tree upsert. The WAL still records the update
1996        // (durability preserved; recovery replay rebuilds the B-tree),
1997        // and `manager.get()` prefers the live segment over the
1998        // B-tree for reads — so the short-circuit is invisible to
1999        // callers. See `persist_entities_to_pager_wal_only`.
2000        let indexed_cols = self
2001            .index_store_ref()
2002            .indexed_columns_set(collection.as_str());
2003        let all_hot = !indexed_cols.is_empty()
2004            && ordinary.iter().all(|item| {
2005                !item.persist_metadata
2006                    && !item
2007                        .modified_columns
2008                        .iter()
2009                        .any(|c| indexed_cols.contains(c))
2010            })
2011            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2012
2013        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
2014        // inner loop hands us `applied` which already owns the post-image
2015        // entity; all we need for the persist path is a read borrow.
2016        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2017            ordinary.iter().map(|item| &item.entity).collect();
2018        let persist_fn = if all_hot {
2019            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2020        } else {
2021            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2022        };
2023        persist_fn(store.as_ref(), collection, &entity_refs)
2024            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2025    }
2026
2027    pub(crate) fn flush_applied_entity_mutation(
2028        &self,
2029        applied: &AppliedEntityMutation,
2030    ) -> RedDBResult<()> {
2031        let store = self.db().store();
2032        if applied.context_index_dirty {
2033            store
2034                .context_index()
2035                .index_entity(&applied.collection, &applied.entity);
2036        }
2037        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
2038        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
2039        // didn't carry recoverable column names) — there's nothing to
2040        // delete-then-insert in that case.
2041        //
2042        // Also build the CDC damage vector here so downstream consumers
2043        // see which columns changed without re-diffing.
2044        let mut changed_columns: Option<Vec<String>> = None;
2045        if !applied.pre_mutation_fields.is_empty() {
2046            let post = entity_row_fields_snapshot(&applied.entity);
2047            if !post.is_empty() {
2048                let damage = crate::application::entity::row_damage_vector(
2049                    &applied.pre_mutation_fields,
2050                    &post,
2051                );
2052                if !damage.is_empty() {
2053                    changed_columns = Some(
2054                        damage
2055                            .touched_columns()
2056                            .into_iter()
2057                            .map(str::to_string)
2058                            .collect(),
2059                    );
2060                }
2061
2062                // HOT-like fast path (P3.T2/T3): when no modified
2063                // column is covered by a secondary index, skip the
2064                // `index_entity_update` call entirely. The function
2065                // would short-circuit internally, but the call still
2066                // reads the registry lock + walks the damage vector
2067                // — avoiding it saves a few microseconds per UPDATE.
2068                // Page-local replace + t_ctid chain support (true
2069                // HOT) lives in a follow-up storage spec.
2070                let indexed_cols: std::collections::HashSet<String> = self
2071                    .index_store_ref()
2072                    .list_indices(applied.collection.as_str())
2073                    .into_iter()
2074                    .filter_map(|idx| idx.columns.first().cloned())
2075                    .collect();
2076                let modified_cols: std::collections::HashSet<String> = damage
2077                    .touched_columns()
2078                    .into_iter()
2079                    .map(str::to_string)
2080                    .collect();
2081                if let Some(old_version) = applied.replaced_entity.as_ref() {
2082                    let old_index_fields: Vec<(String, Value)> = applied
2083                        .pre_mutation_fields
2084                        .iter()
2085                        .filter(|(col, _)| indexed_cols.contains(col))
2086                        .cloned()
2087                        .collect();
2088                    let new_index_fields: Vec<(String, Value)> = post
2089                        .iter()
2090                        .filter(|(col, _)| indexed_cols.contains(col))
2091                        .cloned()
2092                        .collect();
2093                    if !old_index_fields.is_empty() {
2094                        self.index_store_ref()
2095                            .index_entity_delete(
2096                                &applied.collection,
2097                                old_version.id,
2098                                &old_index_fields,
2099                            )
2100                            .map_err(crate::RedDBError::Internal)?;
2101                    }
2102                    if !new_index_fields.is_empty() {
2103                        self.index_store_ref()
2104                            .index_entity_insert(
2105                                &applied.collection,
2106                                applied.entity.id,
2107                                &new_index_fields,
2108                            )
2109                            .map_err(crate::RedDBError::Internal)?;
2110                    }
2111                } else {
2112                    let decision = crate::storage::engine::hot_update::decide(
2113                        &crate::storage::engine::hot_update::HotUpdateInputs {
2114                            collection: applied.collection.as_str(),
2115                            indexed_columns: &indexed_cols,
2116                            modified_columns: &modified_cols,
2117                            // The storage layer currently handles fit via
2118                            // the segment abstraction; we bypass the
2119                            // page-size check here.
2120                            new_tuple_size: 0,
2121                            page_free_space: usize::MAX,
2122                        },
2123                    );
2124                    if !decision.can_hot {
2125                        self.index_store_ref()
2126                            .index_entity_update(
2127                                &applied.collection,
2128                                applied.id,
2129                                &applied.pre_mutation_fields,
2130                                &post,
2131                            )
2132                            .map_err(crate::RedDBError::Internal)?;
2133                    } else {
2134                        // F-04: `applied.collection` is tenant-supplied;
2135                        // strip CR/LF/control bytes via the LogField
2136                        // escaper (ADR 0010).
2137                        tracing::debug!(
2138                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2139                            "hot_update fast-path: skipped index_entity_update"
2140                        );
2141                    }
2142                }
2143            }
2144        }
2145        self.cdc_emit_prebuilt_with_columns(
2146            crate::replication::cdc::ChangeOperation::Update,
2147            &applied.collection,
2148            &applied.entity,
2149            "entity",
2150            applied.metadata.as_ref(),
2151            true,
2152            changed_columns,
2153        );
2154        Ok(())
2155    }
2156
2157    pub(crate) fn apply_loaded_patch_entity(
2158        &self,
2159        collection: String,
2160        entity: crate::storage::UnifiedEntity,
2161        payload: JsonValue,
2162        operations: Vec<PatchEntityOperation>,
2163    ) -> RedDBResult<CreateEntityOutput> {
2164        let applied =
2165            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2166        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2167        self.flush_applied_entity_mutation(&applied)?;
2168        Ok(CreateEntityOutput {
2169            id: applied.id,
2170            entity: Some(applied.entity),
2171        })
2172    }
2173}
2174
2175fn ensure_non_tree_reserved_metadata_patch_paths(
2176    operations: &[PatchEntityOperation],
2177) -> RedDBResult<()> {
2178    for operation in operations {
2179        let Some(key) = operation.path.first().map(String::as_str) else {
2180            continue;
2181        };
2182        ensure_non_tree_reserved_metadata_key(key)?;
2183    }
2184    Ok(())
2185}
2186
2187fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2188    if key.starts_with(TREE_METADATA_PREFIX) {
2189        return Err(crate::RedDBError::Query(format!(
2190            "metadata key '{}' is reserved for managed trees",
2191            key
2192        )));
2193    }
2194    Ok(())
2195}
2196
2197fn ensure_non_tree_reserved_metadata_entries(
2198    metadata: &[(String, MetadataValue)],
2199) -> RedDBResult<()> {
2200    for (key, _) in metadata {
2201        ensure_non_tree_reserved_metadata_key(key)?;
2202    }
2203    Ok(())
2204}
2205
2206fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2207    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2208        return Err(crate::RedDBError::Query(format!(
2209            "edge label '{}' is reserved for managed trees",
2210            TREE_CHILD_EDGE_LABEL
2211        )));
2212    }
2213    Ok(())
2214}
2215
2216impl RedDBRuntime {
2217    pub(crate) fn create_node_unchecked(
2218        &self,
2219        input: CreateNodeInput,
2220    ) -> RedDBResult<CreateEntityOutput> {
2221        let db = self.db();
2222        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2223        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2224        let mut metadata = input.metadata;
2225        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2226        let mut builder = db.node(&input.collection, &input.label);
2227
2228        if let Some(node_type) = input.node_type {
2229            builder = builder.node_type(node_type);
2230        }
2231
2232        for (key, value) in input.properties {
2233            builder = builder.property(key, value);
2234        }
2235
2236        for (key, value) in metadata {
2237            builder = builder.metadata(key, value);
2238        }
2239
2240        for embedding in input.embeddings {
2241            if let Some(model) = embedding.model {
2242                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2243            } else {
2244                builder = builder.embedding(embedding.name, embedding.vector);
2245            }
2246        }
2247
2248        for link in input.table_links {
2249            builder = builder.link_to_table(link.key, link.table);
2250        }
2251
2252        for link in input.node_links {
2253            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2254        }
2255
2256        let id = builder.save()?;
2257        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2258        // don't see this node until the transaction commits.
2259        self.stamp_xmin_if_in_txn(&input.collection, id);
2260        refresh_context_index(&db, &input.collection, id)?;
2261        self.cdc_emit(
2262            crate::replication::cdc::ChangeOperation::Insert,
2263            &input.collection,
2264            id.raw(),
2265            "graph_node",
2266        );
2267        Ok(CreateEntityOutput {
2268            id,
2269            entity: db.store().get(&input.collection, id),
2270        })
2271    }
2272
2273    pub(crate) fn create_edge_unchecked(
2274        &self,
2275        input: CreateEdgeInput,
2276    ) -> RedDBResult<CreateEntityOutput> {
2277        let db = self.db();
2278        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2279        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2280        let mut metadata = input.metadata;
2281        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2282        let mut builder = db
2283            .edge(&input.collection, &input.label)
2284            .from(input.from)
2285            .to(input.to);
2286
2287        if let Some(weight) = input.weight {
2288            builder = builder.weight(weight);
2289        }
2290
2291        for (key, value) in input.properties {
2292            builder = builder.property(key, value);
2293        }
2294
2295        for (key, value) in metadata {
2296            builder = builder.metadata(key, value);
2297        }
2298
2299        let id = builder.save()?;
2300        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2301        // sessions don't follow it until COMMIT.
2302        self.stamp_xmin_if_in_txn(&input.collection, id);
2303        refresh_context_index(&db, &input.collection, id)?;
2304        self.cdc_emit(
2305            crate::replication::cdc::ChangeOperation::Insert,
2306            &input.collection,
2307            id.raw(),
2308            "graph_edge",
2309        );
2310        Ok(CreateEntityOutput {
2311            id,
2312            entity: db.store().get(&input.collection, id),
2313        })
2314    }
2315}
2316
2317fn create_rows_batch_prevalidated_columnar_with_outputs(
2318    runtime: &RedDBRuntime,
2319    collection: String,
2320    column_names: std::sync::Arc<Vec<String>>,
2321    rows: Vec<Vec<crate::storage::schema::Value>>,
2322) -> RedDBResult<Vec<CreateEntityOutput>> {
2323    use crate::storage::{
2324        unified::{EntityData, EntityKind, RowData},
2325        EntityId, UnifiedEntity,
2326    };
2327    use std::sync::Arc;
2328
2329    if rows.is_empty() {
2330        return Ok(Vec::new());
2331    }
2332    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2333    runtime.check_batch_size(rows.len())?;
2334    runtime.check_db_size()?;
2335
2336    let db = runtime.db();
2337    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2338    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2339
2340    let store = db.store();
2341    let table_arc: Arc<str> = Arc::from(collection.as_str());
2342
2343    let indexed_cols = runtime
2344        .index_store_ref()
2345        .indexed_columns_set(collection.as_str());
2346    let has_secondary_indexes = !indexed_cols.is_empty();
2347    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2348        if has_secondary_indexes {
2349            Vec::with_capacity(rows.len())
2350        } else {
2351            Vec::new()
2352        };
2353
2354    let entities: Vec<UnifiedEntity> = rows
2355        .into_iter()
2356        .map(|values| {
2357            if has_secondary_indexes {
2358                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2359                    Vec::with_capacity(indexed_cols.len());
2360                for (name, val) in column_names.iter().zip(values.iter()) {
2361                    if indexed_cols.contains(name) {
2362                        snap.push((name.clone(), val.clone()));
2363                    }
2364                }
2365                field_snapshots.push(snap);
2366            }
2367            let mut row = RowData::new(values);
2368            row.schema = Some(Arc::clone(&column_names));
2369            UnifiedEntity::new(
2370                EntityId::new(0),
2371                EntityKind::TableRow {
2372                    table: Arc::clone(&table_arc),
2373                    row_id: 0,
2374                },
2375                EntityData::Row(row),
2376            )
2377        })
2378        .collect();
2379
2380    let ids = store
2381        .bulk_insert(&collection, entities)
2382        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2383
2384    if has_secondary_indexes {
2385        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2386            .iter()
2387            .zip(field_snapshots)
2388            .map(|(id, fields)| (*id, fields))
2389            .collect();
2390        runtime
2391            .index_store_ref()
2392            .index_entity_insert_batch(&collection, &index_rows)
2393            .map_err(crate::RedDBError::Internal)?;
2394    }
2395
2396    runtime.invalidate_result_cache();
2397    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2398
2399    Ok(ids
2400        .into_iter()
2401        .map(|id| CreateEntityOutput { id, entity: None })
2402        .collect())
2403}
2404
2405impl RuntimeEntityPort for RedDBRuntime {
2406    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2407        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2408        let db = self.db();
2409        let CreateRowInput {
2410            collection,
2411            fields,
2412            metadata: input_metadata,
2413            node_links,
2414            vector_links,
2415        } = input;
2416        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2417        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2418        let mut metadata = input_metadata;
2419        apply_collection_default_ttl(&db, &collection, &mut metadata);
2420        let fields = contract.normalize_insert_fields(fields)?;
2421        contract.enforce_row_uniqueness(&fields, None)?;
2422        // Route through MutationEngine for unified hot path.
2423        let engine = self.mutation_engine();
2424        let result = engine.apply(
2425            collection.clone(),
2426            vec![crate::runtime::mutation::MutationRow {
2427                fields,
2428                metadata,
2429                node_links,
2430                vector_links,
2431            }],
2432        )?;
2433        let id = result.ids[0];
2434        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2435        // that also takes a write lock on the entity cache. We know
2436        // the collection — hit the manager directly. Cuts
2437        // create_row() p50 roughly in half on the hot path.
2438        Ok(CreateEntityOutput {
2439            id,
2440            entity: db.store().get(&collection, id),
2441        })
2442    }
2443
2444    fn create_rows_batch(
2445        &self,
2446        input: CreateRowsBatchInput,
2447    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2448        if input.rows.is_empty() {
2449            return Ok(Vec::new());
2450        }
2451        self.check_batch_size(input.rows.len())?;
2452        self.check_db_size()?;
2453
2454        let db = self.db();
2455        let collection = input.collection;
2456        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2457        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2458
2459        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2460        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2461        for row in input.rows {
2462            if row.collection != collection {
2463                return Err(crate::RedDBError::Query(format!(
2464                    "batch row collection mismatch: expected '{}', got '{}'",
2465                    collection, row.collection
2466                )));
2467            }
2468
2469            let mut metadata = row.metadata;
2470            apply_collection_default_ttl(&db, &collection, &mut metadata);
2471            let fields = contract.normalize_insert_fields(row.fields)?;
2472            contract.enforce_row_uniqueness(&fields, None)?;
2473            uniqueness_rows.push(fields.clone());
2474            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2475        }
2476
2477        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2478
2479        // Route through MutationEngine: single bulk_insert + one CDC batch
2480        // instead of N separate cdc_emit() calls (each acquires a write lock).
2481        let engine = {
2482            let e = self.mutation_engine();
2483            if input.suppress_events {
2484                e.with_suppress_events()
2485            } else {
2486                e
2487            }
2488        };
2489        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2490            .into_iter()
2491            .map(|(fields, metadata, node_links, vector_links)| {
2492                crate::runtime::mutation::MutationRow {
2493                    fields,
2494                    metadata,
2495                    node_links,
2496                    vector_links,
2497                }
2498            })
2499            .collect();
2500
2501        let result = engine
2502            .apply(collection.clone(), mutation_rows)
2503            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2504
2505        let store = db.store();
2506        Ok(result
2507            .ids
2508            .into_iter()
2509            .map(|id| CreateEntityOutput {
2510                id,
2511                entity: store.get(&collection, id),
2512            })
2513            .collect())
2514    }
2515
2516    fn create_rows_batch_prevalidated_columnar(
2517        &self,
2518        collection: String,
2519        column_names: std::sync::Arc<Vec<String>>,
2520        rows: Vec<Vec<crate::storage::schema::Value>>,
2521    ) -> RedDBResult<usize> {
2522        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2523            .map(|outputs| outputs.len())
2524    }
2525
2526    fn create_rows_batch_columnar(
2527        &self,
2528        collection: String,
2529        column_names: std::sync::Arc<Vec<String>>,
2530        rows: Vec<Vec<crate::storage::schema::Value>>,
2531    ) -> RedDBResult<usize> {
2532        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2533            .map(|outputs| outputs.len())
2534    }
2535
2536    fn create_rows_batch_columnar_with_outputs(
2537        &self,
2538        collection: String,
2539        column_names: std::sync::Arc<Vec<String>>,
2540        rows: Vec<Vec<crate::storage::schema::Value>>,
2541    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2542        if rows.is_empty() {
2543            return Ok(Vec::new());
2544        }
2545        self.check_batch_size(rows.len())?;
2546        self.check_db_size()?;
2547
2548        let db = self.db();
2549        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2550        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2551
2552        // Fast path: when the collection carries no contract (or the
2553        // contract has no declared columns) `normalize_insert_fields`
2554        // is a no-op and `enforce_row_uniqueness` finds no unique
2555        // columns to check. Skip the per-row `(String, Value)` tuple
2556        // materialisation entirely and route through the prevalidated
2557        // columnar kernel — same write, CDC, and index path, just
2558        // without the wasted (String, Value) clones. This is the
2559        // bench `bench_users` shape (no contract declared by the
2560        // adapter's `setup_schema`).
2561        let needs_normalisation = match db.collection_contract(&collection) {
2562            Some(c) => {
2563                c.declared_model == crate::catalog::CollectionModel::Table
2564                    && (!c.declared_columns.is_empty()
2565                        || c.table_def
2566                            .as_ref()
2567                            .map(|t| !t.columns.is_empty())
2568                            .unwrap_or(false))
2569            }
2570            None => false,
2571        };
2572        if !needs_normalisation {
2573            return create_rows_batch_prevalidated_columnar_with_outputs(
2574                self,
2575                collection,
2576                column_names,
2577                rows,
2578            );
2579        }
2580
2581        // Slow path: contract requires per-row normalisation /
2582        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2583        // the columnar layout (this still pays N×ncols `String::clone`
2584        // — but it's deferred out of the wire decoder hot loop and
2585        // happens behind a runtime API, not the per-row decode loop)
2586        // and fall through to the existing `create_rows_batch` path.
2587        let ncols = column_names.len();
2588        let tuple_rows: Vec<CreateRowInput> = rows
2589            .into_iter()
2590            .map(|values| {
2591                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2592                    Vec::with_capacity(ncols);
2593                for (name, value) in column_names.iter().zip(values) {
2594                    fields.push((name.clone(), value));
2595                }
2596                CreateRowInput {
2597                    collection: collection.clone(),
2598                    fields,
2599                    metadata: Vec::new(),
2600                    node_links: Vec::new(),
2601                    vector_links: Vec::new(),
2602                }
2603            })
2604            .collect();
2605        self.create_rows_batch(CreateRowsBatchInput {
2606            collection,
2607            rows: tuple_rows,
2608            suppress_events: false,
2609        })
2610    }
2611
2612    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2613        if input.rows.is_empty() {
2614            return Ok(0);
2615        }
2616        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2617        self.check_batch_size(input.rows.len())?;
2618        self.check_db_size()?;
2619
2620        let db = self.db();
2621        let collection = input.collection;
2622        // Still verify the collection's declared model before we blast
2623        // rows at it — this one-off check is O(1), independent of
2624        // ncols, and catches schema-kind mismatches that the client
2625        // can't always see.
2626        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2627        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2628
2629        // Hoist the per-collection default TTL lookup out of the
2630        // per-row loop — it only depends on the collection, not on
2631        // individual rows, and the old path did one HashMap read per
2632        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2633        // apply to any row whose metadata doesn't already carry a TTL.
2634        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2635
2636        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2637            Vec::with_capacity(input.rows.len());
2638        let mut mutation_rows = mutation_rows;
2639        for row in input.rows {
2640            if row.collection != collection {
2641                return Err(crate::RedDBError::Query(format!(
2642                    "batch row collection mismatch: expected '{}', got '{}'",
2643                    collection, row.collection
2644                )));
2645            }
2646            let mut metadata = row.metadata;
2647            if let Some(ttl) = default_ttl_ms {
2648                if !has_internal_ttl_metadata(&metadata) {
2649                    metadata.push((
2650                        "_ttl_ms".to_string(),
2651                        if ttl <= i64::MAX as u64 {
2652                            MetadataValue::Int(ttl as i64)
2653                        } else {
2654                            MetadataValue::Timestamp(ttl)
2655                        },
2656                    ));
2657                }
2658            }
2659            mutation_rows.push(crate::runtime::mutation::MutationRow {
2660                fields: row.fields,
2661                metadata,
2662                node_links: row.node_links,
2663                vector_links: row.vector_links,
2664            });
2665        }
2666
2667        let engine = self.mutation_engine();
2668        let result = engine
2669            .apply(collection, mutation_rows)
2670            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2671        Ok(result.ids.len())
2672    }
2673
2674    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2675        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2676        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2677            input.properties.iter().map(|(key, _)| key.as_str()),
2678            &format!("node '{}'", input.collection),
2679        )?;
2680        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2681        self.create_node_unchecked(input)
2682    }
2683
2684    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2685        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2686        crate::reserved_fields::ensure_no_reserved_public_item_fields(
2687            input.properties.iter().map(|(key, _)| key.as_str()),
2688            &format!("edge '{}'", input.collection),
2689        )?;
2690        ensure_non_tree_structural_edge_label(&input.label)?;
2691        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2692        self.create_edge_unchecked(input)
2693    }
2694
2695    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2696        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2697        let db = self.db();
2698        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2699        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2700        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2701        let mut metadata = input.metadata;
2702        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2703        let mut builder = db.vector(&input.collection).dense(input.dense);
2704
2705        if let Some(content) = input.content {
2706            builder = builder.content(content);
2707        }
2708
2709        for (key, value) in metadata {
2710            builder = builder.metadata(key, value);
2711        }
2712
2713        if let Some(link_row) = input.link_row {
2714            builder = builder.link_to_table(link_row);
2715        }
2716
2717        if let Some(link_node) = input.link_node {
2718            builder = builder.link_to_node(link_node);
2719        }
2720
2721        let id = builder.save()?;
2722        let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2723            Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2724            _ => None,
2725        };
2726        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2727        // concurrent ANN scans hide it until the transaction commits.
2728        self.stamp_xmin_if_in_txn(&input.collection, id);
2729        refresh_context_index(&db, &input.collection, id)?;
2730        // Issue #693 — vector.turbo write path. Order matters and
2731        // matches the write sequence the next slice (#673) will
2732        // attach kill-points to: WAL append → in-memory index update
2733        // → TurboExtent append. The legacy `vector` path is the
2734        // `None` branch and is untouched.
2735        if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2736            // Lazy populate so an INSERT after restart sees the
2737            // pre-restart vectors in the index before appending its
2738            // own.
2739            state.ensure_populated(&db.store(), &input.collection);
2740            // Issue #694 — named crash boundaries for #673.
2741            use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2742            fire(InjectionPoint::BeforeWalFsync);
2743            let _ = db
2744                .store()
2745                .append_vector_insert_record(&input.collection, id.raw(), &dense);
2746            fire(InjectionPoint::BeforeIndexCommit);
2747            let mut index = state.index.lock();
2748            index.insert(id, dense.clone());
2749            // Mirror the encoded codes into the per-collection
2750            // TurboExtent when one is available. The bytes are the
2751            // 4-bit-packed code groups (`n_byte_groups`) plus the
2752            // little-endian L2 norm — same shape `BlockedCodeStorage`
2753            // appends in-memory. Failures are non-fatal in this
2754            // slice; durability of the extent is owned by #673.
2755            fire(InjectionPoint::BeforeExtentFsync);
2756            if let Some(extent) = state.extent.lock().as_mut() {
2757                let packed = index.encode_for_extent(&dense);
2758                let _ = extent.append(&packed);
2759            }
2760        }
2761        self.cdc_emit(
2762            crate::replication::cdc::ChangeOperation::Insert,
2763            &input.collection,
2764            id.raw(),
2765            "vector",
2766        );
2767        Ok(CreateEntityOutput {
2768            id,
2769            entity: db.store().get(&input.collection, id),
2770        })
2771    }
2772
2773    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2774        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2775        let db = self.db();
2776        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2777        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2778
2779        if let JsonValue::Object(ref map) = input.body {
2780            crate::reserved_fields::ensure_no_reserved_public_item_fields(
2781                map.keys().map(String::as_str),
2782                &format!("document '{}'", input.collection),
2783            )?;
2784        }
2785
2786        // Serialize the full body as Value::Json for the "body" field
2787        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2788            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2789        })?;
2790        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2791            "body".to_string(),
2792            crate::storage::schema::Value::Json(body_bytes),
2793        )];
2794
2795        // Flatten top-level keys from the body into named fields for filtering
2796        if let JsonValue::Object(ref map) = input.body {
2797            for (key, value) in map {
2798                let storage_value = json_to_storage_value(value)?;
2799                fields.push((key.clone(), storage_value));
2800            }
2801        }
2802
2803        let mut metadata = input.metadata;
2804        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2805        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2806            .iter()
2807            .map(|(key, value)| (key.as_str(), value.clone()))
2808            .collect();
2809        let mut builder = db.row(&input.collection, columns);
2810
2811        for (key, value) in metadata {
2812            builder = builder.metadata(key, value);
2813        }
2814
2815        for node in input.node_links {
2816            builder = builder.link_to_node(node);
2817        }
2818
2819        for vector in input.vector_links {
2820            builder = builder.link_to_vector(vector);
2821        }
2822
2823        let id = builder.save()?;
2824        // Phase 1.1 MVCC universal: stamp xmin on the document.
2825        self.stamp_xmin_if_in_txn(&input.collection, id);
2826        refresh_context_index(&db, &input.collection, id)?;
2827        self.cdc_emit(
2828            crate::replication::cdc::ChangeOperation::Insert,
2829            &input.collection,
2830            id.raw(),
2831            "document",
2832        );
2833        Ok(CreateEntityOutput {
2834            id,
2835            entity: db.store().get(&input.collection, id),
2836        })
2837    }
2838
2839    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2840        let db = self.db();
2841        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2842        let declared_model = db
2843            .collection_contract(&input.collection)
2844            .map(|contract| contract.declared_model);
2845        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2846            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2847            self.seal_vault_value(&input.collection, input.value)?
2848        } else {
2849            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2850            input.value
2851        };
2852        let fields = vec![
2853            (
2854                "key".to_string(),
2855                crate::storage::schema::Value::text(input.key),
2856            ),
2857            ("value".to_string(), value),
2858        ];
2859        let collection = input.collection;
2860        let result = self.mutation_engine().apply(
2861            collection.clone(),
2862            vec![crate::runtime::mutation::MutationRow {
2863                fields,
2864                metadata: input.metadata,
2865                node_links: Vec::new(),
2866                vector_links: Vec::new(),
2867            }],
2868        )?;
2869        let id = result.ids[0];
2870        Ok(CreateEntityOutput {
2871            id,
2872            entity: db.store().get(&collection, id),
2873        })
2874    }
2875
2876    fn create_timeseries_point(
2877        &self,
2878        input: CreateTimeSeriesPointInput,
2879    ) -> RedDBResult<CreateEntityOutput> {
2880        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2881        let db = self.db();
2882        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2883        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2884
2885        let mut fields = vec![
2886            (
2887                "metric".to_string(),
2888                crate::storage::schema::Value::text(input.metric),
2889            ),
2890            (
2891                "value".to_string(),
2892                crate::storage::schema::Value::Float(input.value),
2893            ),
2894        ];
2895
2896        if let Some(timestamp_ns) = input.timestamp_ns {
2897            fields.push((
2898                "timestamp".to_string(),
2899                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2900            ));
2901        }
2902
2903        if !input.tags.is_empty() {
2904            let tags_json = JsonValue::Object(
2905                input
2906                    .tags
2907                    .into_iter()
2908                    .map(|(key, value)| (key, JsonValue::String(value)))
2909                    .collect(),
2910            );
2911            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2912                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2913            })?;
2914            fields.push((
2915                "tags".to_string(),
2916                crate::storage::schema::Value::Json(tags_bytes),
2917            ));
2918        }
2919
2920        let collection = input.collection;
2921        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2922        // Phase 1.1 MVCC universal: stamp xmin on the point so
2923        // concurrent range scans hide it until COMMIT.
2924        self.stamp_xmin_if_in_txn(&collection, id);
2925        refresh_context_index(&db, &collection, id)?;
2926
2927        Ok(CreateEntityOutput {
2928            id,
2929            entity: db.store().get(&collection, id),
2930        })
2931    }
2932
2933    fn get_kv(
2934        &self,
2935        collection: &str,
2936        key: &str,
2937    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2938        let db = self.db();
2939        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2940        let store = db.store();
2941        let Some(manager) = store.get_collection(collection) else {
2942            return Ok(None);
2943        };
2944        let entities = manager.query_all(|_| true);
2945        for entity in entities {
2946            if let crate::storage::EntityData::Row(ref row) = entity.data {
2947                if let Some(ref named) = row.named {
2948                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2949                        if &**k == key {
2950                            let value = named
2951                                .get("value")
2952                                .cloned()
2953                                .unwrap_or(crate::storage::schema::Value::Null);
2954                            return Ok(Some((value, entity.id)));
2955                        }
2956                    }
2957                }
2958            }
2959        }
2960        Ok(None)
2961    }
2962
2963    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2964        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2965        let found = self.get_kv(collection, key)?;
2966        if let Some((_, id)) = found {
2967            let db = self.db();
2968            let store = db.store();
2969            let deleted = store
2970                .delete(collection, id)
2971                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2972            if deleted {
2973                store.context_index().remove_entity(id);
2974            }
2975            Ok(deleted)
2976        } else {
2977            Ok(false)
2978        }
2979    }
2980
2981    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2982        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2983        let PatchEntityInput {
2984            collection,
2985            id,
2986            payload,
2987            operations,
2988        } = input;
2989        let db = self.db();
2990        let store = db.store();
2991        let Some(manager) = store.get_collection(&collection) else {
2992            return Err(crate::RedDBError::NotFound(format!(
2993                "collection not found: {collection}"
2994            )));
2995        };
2996        let Some(entity) = manager.get(id) else {
2997            return Err(crate::RedDBError::NotFound(format!(
2998                "entity not found: {}",
2999                id.raw()
3000            )));
3001        };
3002        self.apply_loaded_patch_entity(collection, entity, payload, operations)
3003    }
3004
3005    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3006        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3007        let store = self.db().store();
3008        // Snapshot row fields before delete so we can mirror the removal
3009        // into every secondary index. The fetch is best-effort: if the
3010        // entity is already gone, the delete below is a no-op anyway.
3011        let pre_delete_fields = store
3012            .get(&input.collection, input.id)
3013            .as_ref()
3014            .map(entity_row_fields_snapshot)
3015            .unwrap_or_default();
3016        // Store delete first (source of truth). Crash between here and index removal
3017        // leaves the entity invisible to most queries but recoverable; the reverse
3018        // (remove index first, then crash) leaves the entity permanently orphaned.
3019        let deleted = store
3020            .delete(&input.collection, input.id)
3021            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3022        if deleted {
3023            store.context_index().remove_entity(input.id);
3024            // Secondary index maintenance — surface only registry-shape
3025            // errors; missing-index removals are tolerated inside the call.
3026            if !pre_delete_fields.is_empty() {
3027                self.index_store_ref()
3028                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3029                    .map_err(crate::RedDBError::Internal)?;
3030            }
3031            self.cdc_emit(
3032                crate::replication::cdc::ChangeOperation::Delete,
3033                &input.collection,
3034                input.id.raw(),
3035                "entity",
3036            );
3037        }
3038        Ok(DeleteEntityOutput {
3039            deleted,
3040            id: input.id,
3041        })
3042    }
3043}