Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        vector_dimension: None,
112        vector_metric: None,
113        context_index_fields: Vec::new(),
114        declared_columns: Vec::new(),
115        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117        timestamps_enabled: false,
118        context_index_enabled: false,
119        // Implicit contracts are created on first write — mutability
120        // is the default until the operator runs explicit DDL.
121        append_only: false,
122        subscriptions: Vec::new(),
123    })
124    .map(|_| ())
125    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
126}
127
128fn implicit_contract_unix_ms() -> u128 {
129    std::time::SystemTime::now()
130        .duration_since(std::time::UNIX_EPOCH)
131        .unwrap_or_default()
132        .as_millis()
133}
134
135fn collection_model_allows(
136    declared_model: crate::catalog::CollectionModel,
137    requested_model: crate::catalog::CollectionModel,
138) -> bool {
139    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
140}
141
142fn ensure_vector_dimension_contract(
143    db: &crate::storage::unified::devx::RedDB,
144    collection: &str,
145    actual_dimension: usize,
146) -> RedDBResult<()> {
147    let Some(expected_dimension) = db
148        .collection_contract(collection)
149        .and_then(|contract| contract.vector_dimension)
150    else {
151        return Ok(());
152    };
153    if expected_dimension == actual_dimension {
154        return Ok(());
155    }
156    Err(crate::RedDBError::Query(format!(
157        "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
158    )))
159}
160
161fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
162    match model {
163        crate::catalog::CollectionModel::Table => "table",
164        crate::catalog::CollectionModel::Document => "document",
165        crate::catalog::CollectionModel::Graph => "graph",
166        crate::catalog::CollectionModel::Vector => "vector",
167        crate::catalog::CollectionModel::Hll => "hll",
168        crate::catalog::CollectionModel::Sketch => "sketch",
169        crate::catalog::CollectionModel::Filter => "filter",
170        crate::catalog::CollectionModel::Kv => "kv",
171        crate::catalog::CollectionModel::Config => "config",
172        crate::catalog::CollectionModel::Vault => "vault",
173        crate::catalog::CollectionModel::Mixed => "mixed",
174        crate::catalog::CollectionModel::TimeSeries => "timeseries",
175        crate::catalog::CollectionModel::Queue => "queue",
176    }
177}
178
179#[derive(Clone)]
180struct UniquenessRule {
181    name: String,
182    columns: Vec<String>,
183    primary_key: bool,
184}
185
186#[derive(Copy, Clone, PartialEq, Eq)]
187enum NormalizeMode {
188    /// First write for this row. Timestamps auto-filled from now on
189    /// both `created_at` and `updated_at`; user attempts to set
190    /// either column are rejected.
191    Insert,
192    /// Update/patch path. `created_at` is preserved from the existing
193    /// row (immutable after insert); `updated_at` is bumped to now.
194    /// User attempts to set either via the patch are rejected.
195    Update,
196}
197
198mod collection_contract_enforcement {
199    use super::*;
200
201    pub(super) struct CollectionContractWriteEnforcer<'a> {
202        db: &'a crate::storage::unified::devx::RedDB,
203        collection: &'a str,
204    }
205
206    impl<'a> CollectionContractWriteEnforcer<'a> {
207        pub(super) fn new(
208            db: &'a crate::storage::unified::devx::RedDB,
209            collection: &'a str,
210        ) -> Self {
211            Self { db, collection }
212        }
213
214        pub(super) fn ensure_model(
215            &self,
216            requested_model: crate::catalog::CollectionModel,
217        ) -> RedDBResult<()> {
218            ensure_collection_model_contract(self.db, self.collection, requested_model)
219        }
220
221        pub(super) fn normalize_insert_fields(
222            &self,
223            fields: Vec<(String, Value)>,
224        ) -> RedDBResult<Vec<(String, Value)>> {
225            normalize_row_fields_for_contract_with_mode(
226                self.db,
227                self.collection,
228                fields,
229                NormalizeMode::Insert,
230            )
231        }
232
233        pub(super) fn normalize_update_fields(
234            &self,
235            fields: Vec<(String, Value)>,
236        ) -> RedDBResult<Vec<(String, Value)>> {
237            normalize_row_fields_for_contract_with_mode(
238                self.db,
239                self.collection,
240                fields,
241                NormalizeMode::Update,
242            )
243        }
244
245        pub(super) fn enforce_row_uniqueness(
246            &self,
247            fields: &[(String, Value)],
248            exclude_id: Option<crate::storage::EntityId>,
249        ) -> RedDBResult<()> {
250            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
251        }
252
253        pub(super) fn enforce_batch_uniqueness(
254            &self,
255            rows: &[Vec<(String, Value)>],
256        ) -> RedDBResult<()> {
257            enforce_row_batch_uniqueness(self.db, self.collection, rows)
258        }
259
260        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
261            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
262        }
263    }
264}
265
266use collection_contract_enforcement::CollectionContractWriteEnforcer;
267
268fn normalize_row_fields_for_contract_with_mode(
269    db: &crate::storage::unified::devx::RedDB,
270    collection: &str,
271    fields: Vec<(String, Value)>,
272    mode: NormalizeMode,
273) -> RedDBResult<Vec<(String, Value)>> {
274    let Some(contract) = db.collection_contract(collection) else {
275        return Ok(fields);
276    };
277
278    if contract.declared_model != crate::catalog::CollectionModel::Table
279        || (contract.declared_columns.is_empty()
280            && contract
281                .table_def
282                .as_ref()
283                .map(|table| table.columns.is_empty())
284                .unwrap_or(true))
285    {
286        return Ok(fields);
287    }
288
289    // Capture the pre-normalize value of created_at (if present) so
290    // Update mode can preserve it. Also capture updated_at to detect
291    // user attempts to set it via the patch payload.
292    //
293    // Heuristic for Update mode: if fields ALREADY contains a
294    // `created_at` whose value matches the row's on-disk entity, the
295    // caller is the patch pipeline carrying forward an auto-populated
296    // column — not a user mutation. Allow pass-through in that case,
297    // then restore the original value at the end.
298    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
299        fields
300            .iter()
301            .find(|(n, _)| n == "created_at")
302            .map(|(_, v)| v.clone())
303    } else {
304        None
305    };
306
307    // Reject user attempts to set runtime-managed timestamp columns.
308    // On Insert we reject any mention; on Update we only reject when
309    // the patch pipeline handed us a NEW value (not the one we
310    // auto-populated during the last insert).
311    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
312        for (name, _) in &fields {
313            if name == "created_at" || name == "updated_at" {
314                return Err(crate::RedDBError::Query(format!(
315                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
316                    collection, name
317                )));
318            }
319        }
320    }
321
322    let mut provided = std::collections::BTreeMap::new();
323    for (name, value) in &fields {
324        provided.insert(name.clone(), value.clone());
325    }
326
327    let resolved_columns = resolved_contract_columns(&contract)?;
328    let declared_names: std::collections::BTreeSet<String> = resolved_columns
329        .iter()
330        .map(|column| column.name.clone())
331        .collect();
332    let unknown_fields: Vec<String> = fields
333        .iter()
334        .filter(|(name, _)| !declared_names.contains(name))
335        .map(|(name, _)| name.clone())
336        .collect();
337    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
338        && !unknown_fields.is_empty()
339    {
340        return Err(crate::RedDBError::Query(format!(
341            "collection '{}' is strict and does not allow undeclared fields: {}",
342            collection,
343            unknown_fields.join(", ")
344        )));
345    }
346    let mut normalized = Vec::new();
347    let now_ms = current_unix_ms_u64();
348
349    for column in &resolved_columns {
350        match provided.remove(&column.name) {
351            Some(value) => {
352                // Runtime-managed columns on Update: always overwrite
353                // with the runtime's own value (preserved created_at
354                // or fresh updated_at). User mutations are silently
355                // discarded because we reject them earlier.
356                if contract.timestamps_enabled && mode == NormalizeMode::Update {
357                    match column.name.as_str() {
358                        "created_at" => {
359                            normalized.push((
360                                column.name.clone(),
361                                existing_created_at
362                                    .clone()
363                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
364                            ));
365                            continue;
366                        }
367                        "updated_at" => {
368                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
369                            continue;
370                        }
371                        _ => {}
372                    }
373                }
374                normalized.push((
375                    column.name.clone(),
376                    normalize_contract_value(collection, column, value)?,
377                ));
378            }
379            None => {
380                // Runtime-managed timestamp columns: auto-fill with now
381                // when the contract opted in. Both get the same value on
382                // first insert so callers can order by either.
383                if contract.timestamps_enabled
384                    && (column.name == "created_at" || column.name == "updated_at")
385                {
386                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
387                    continue;
388                }
389                if let Some(default) = &column.default {
390                    normalized.push((
391                        column.name.clone(),
392                        coerce_contract_literal(collection, &column.name, column, default)?,
393                    ));
394                } else if column.not_null {
395                    return Err(crate::RedDBError::Query(format!(
396                        "missing required column '{}' for collection '{}'",
397                        column.name, collection
398                    )));
399                }
400            }
401        }
402    }
403
404    for (name, value) in fields {
405        if !declared_names.contains(&name) {
406            normalized.push((name, value));
407        }
408    }
409
410    Ok(normalized)
411}
412
413fn current_unix_ms_u64() -> u64 {
414    std::time::SystemTime::now()
415        .duration_since(std::time::UNIX_EPOCH)
416        .map(|d| d.as_millis() as u64)
417        .unwrap_or(0)
418}
419
420fn enforce_row_uniqueness(
421    db: &crate::storage::unified::devx::RedDB,
422    collection: &str,
423    fields: &[(String, Value)],
424    exclude_id: Option<crate::storage::EntityId>,
425) -> RedDBResult<()> {
426    let Some(contract) = db.collection_contract(collection) else {
427        return Ok(());
428    };
429    if contract.declared_model != crate::catalog::CollectionModel::Table
430        && contract.declared_model != crate::catalog::CollectionModel::Mixed
431    {
432        return Ok(());
433    }
434
435    let rules = resolved_uniqueness_rules(&contract);
436    if rules.is_empty() {
437        return Ok(());
438    }
439
440    let store = db.store();
441    let Some(manager) = store.get_collection(collection) else {
442        return Ok(());
443    };
444
445    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
446
447    for rule in &rules {
448        let mut expected_signatures = Vec::new();
449        let mut skip_rule = false;
450
451        for column in &rule.columns {
452            match input_fields.get(column) {
453                Some(Value::Null) | None if rule.primary_key => {
454                    return Err(crate::RedDBError::Query(format!(
455                        "primary key '{}' in collection '{}' requires non-null column '{}'",
456                        rule.name, collection, column
457                    )))
458                }
459                Some(Value::Null) | None => {
460                    skip_rule = true;
461                    break;
462                }
463                Some(value) => {
464                    expected_signatures.push((column.clone(), value_signature(value)));
465                }
466            }
467        }
468
469        if skip_rule {
470            continue;
471        }
472
473        for entity in manager.query_all(|_| true) {
474            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
475                continue;
476            }
477            let Some(existing_fields) = row_fields_from_entity(&entity) else {
478                continue;
479            };
480
481            let duplicate = expected_signatures.iter().all(|(column, expected)| {
482                existing_fields
483                    .get(column)
484                    .map(|value| value_signature(value) == *expected)
485                    .unwrap_or(false)
486            });
487
488            if duplicate {
489                let qualifier = if rule.primary_key {
490                    "primary key"
491                } else {
492                    "unique constraint"
493                };
494                return Err(crate::RedDBError::Query(format!(
495                    "{} '{}' violated on collection '{}' for columns [{}]",
496                    qualifier,
497                    rule.name,
498                    collection,
499                    rule.columns.join(", ")
500                )));
501            }
502        }
503    }
504
505    Ok(())
506}
507
508fn enforce_row_batch_uniqueness(
509    db: &crate::storage::unified::devx::RedDB,
510    collection: &str,
511    rows: &[Vec<(String, Value)>],
512) -> RedDBResult<()> {
513    let Some(contract) = db.collection_contract(collection) else {
514        return Ok(());
515    };
516    if contract.declared_model != crate::catalog::CollectionModel::Table
517        && contract.declared_model != crate::catalog::CollectionModel::Mixed
518    {
519        return Ok(());
520    }
521
522    let rules = resolved_uniqueness_rules(&contract);
523    if rules.is_empty() {
524        return Ok(());
525    }
526
527    for rule in &rules {
528        let mut seen = std::collections::HashMap::<String, usize>::new();
529        for (row_index, fields) in rows.iter().enumerate() {
530            let input_fields: std::collections::BTreeMap<String, Value> =
531                fields.iter().cloned().collect();
532            let mut signatures = Vec::new();
533            let mut skip_rule = false;
534
535            for column in &rule.columns {
536                match input_fields.get(column) {
537                    Some(Value::Null) | None if rule.primary_key => {
538                        return Err(crate::RedDBError::Query(format!(
539                            "primary key '{}' in collection '{}' requires non-null column '{}'",
540                            rule.name, collection, column
541                        )))
542                    }
543                    Some(Value::Null) | None => {
544                        skip_rule = true;
545                        break;
546                    }
547                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
548                }
549            }
550
551            if skip_rule {
552                continue;
553            }
554
555            let signature = signatures.join("|");
556            if let Some(previous_index) = seen.insert(signature, row_index) {
557                return Err(crate::RedDBError::Query(format!(
558                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
559                    rule.name,
560                    collection,
561                    previous_index + 1,
562                    row_index + 1
563                )));
564            }
565        }
566    }
567
568    Ok(())
569}
570
571fn row_update_requires_uniqueness_check(
572    db: &crate::storage::unified::devx::RedDB,
573    collection: &str,
574    modified_columns: &[String],
575) -> bool {
576    if modified_columns.is_empty() {
577        return false;
578    }
579
580    let Some(contract) = db.collection_contract(collection) else {
581        return false;
582    };
583    if contract.declared_model != crate::catalog::CollectionModel::Table
584        && contract.declared_model != crate::catalog::CollectionModel::Mixed
585    {
586        return false;
587    }
588
589    let rules = resolved_uniqueness_rules(&contract);
590    if rules.is_empty() {
591        return false;
592    }
593
594    rules.iter().any(|rule| {
595        rule.columns.iter().any(|column| {
596            modified_columns
597                .iter()
598                .any(|modified| modified.eq_ignore_ascii_case(column))
599        })
600    })
601}
602
603pub(crate) fn build_row_update_contract_plan(
604    db: &crate::storage::unified::devx::RedDB,
605    collection: &str,
606) -> RedDBResult<Option<RowUpdateContractPlan>> {
607    let Some(contract) = db.collection_contract(collection) else {
608        return Ok(None);
609    };
610
611    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
612        && !(contract.declared_columns.is_empty()
613            && contract
614                .table_def
615                .as_ref()
616                .map(|table| table.columns.is_empty())
617                .unwrap_or(true))
618    {
619        resolved_contract_columns(&contract)?
620            .into_iter()
621            .map(|rule| {
622                (
623                    rule.name.clone(),
624                    RowUpdateColumnRule {
625                        name: rule.name,
626                        data_type: rule.data_type,
627                        data_type_name: rule.data_type_name,
628                        not_null: rule.not_null,
629                        enum_variants: rule.enum_variants,
630                    },
631                )
632            })
633            .collect()
634    } else {
635        HashMap::new()
636    };
637
638    let unique_columns = if matches!(
639        contract.declared_model,
640        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
641    ) {
642        resolved_uniqueness_rules(&contract)
643            .into_iter()
644            .flat_map(|rule| rule.columns.into_iter())
645            .map(|column| (column, ()))
646            .collect()
647    } else {
648        HashMap::new()
649    };
650
651    Ok(Some(RowUpdateContractPlan {
652        timestamps_enabled: contract.timestamps_enabled,
653        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
654        declared_rules,
655        unique_columns,
656    }))
657}
658
659pub(crate) fn normalize_row_update_assignment_with_plan(
660    collection: &str,
661    column: &str,
662    value: Value,
663    row_contract_plan: Option<&RowUpdateContractPlan>,
664) -> RedDBResult<Value> {
665    let Some(plan) = row_contract_plan else {
666        return Ok(value);
667    };
668
669    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
670        return Err(crate::RedDBError::Query(format!(
671            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
672            collection, column
673        )));
674    }
675
676    if let Some(rule) = plan.declared_rules.get(column) {
677        let rule = ResolvedColumnRule {
678            name: rule.name.clone(),
679            data_type: rule.data_type,
680            data_type_name: rule.data_type_name.clone(),
681            not_null: rule.not_null,
682            default: None,
683            enum_variants: rule.enum_variants.clone(),
684        };
685        normalize_contract_value(collection, &rule, value)
686    } else if plan.strict_schema {
687        Err(crate::RedDBError::Query(format!(
688            "collection '{}' is strict and does not allow undeclared fields: {}",
689            collection, column
690        )))
691    } else {
692        Ok(value)
693    }
694}
695
696pub(crate) fn normalize_row_update_value_for_rule(
697    collection: &str,
698    value: Value,
699    row_rule: Option<&RowUpdateColumnRule>,
700) -> RedDBResult<Value> {
701    let Some(rule) = row_rule else {
702        return Ok(value);
703    };
704
705    let rule = ResolvedColumnRule {
706        name: rule.name.clone(),
707        data_type: rule.data_type,
708        data_type_name: rule.data_type_name.clone(),
709        not_null: rule.not_null,
710        default: None,
711        enum_variants: rule.enum_variants.clone(),
712    };
713    normalize_contract_value(collection, &rule, value)
714}
715
716fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
717    if let Some(named) = row.named.as_mut() {
718        named.insert(name.to_string(), value);
719        return;
720    }
721
722    if let Some(schema) = row.schema.as_ref() {
723        if let Some(index) = schema.iter().position(|column| column == name) {
724            if let Some(slot) = row.columns.get_mut(index) {
725                *slot = value;
726                return;
727            }
728        }
729
730        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
731        for (column, current) in schema.iter().zip(row.columns.iter()) {
732            named.insert(column.clone(), current.clone());
733        }
734        named.insert(name.to_string(), value);
735        row.named = Some(named);
736        return;
737    }
738
739    let mut named = HashMap::with_capacity(1);
740    named.insert(name.to_string(), value);
741    row.named = Some(named);
742}
743
744fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
745    if let Some(named) = row.named.as_ref() {
746        named
747            .iter()
748            .map(|(key, value)| (key.clone(), value.clone()))
749            .collect()
750    } else if let Some(schema) = row.schema.as_ref() {
751        schema
752            .iter()
753            .cloned()
754            .zip(row.columns.iter().cloned())
755            .collect()
756    } else {
757        Vec::new()
758    }
759}
760
761fn apply_row_field_assignments_raw<I>(
762    row: &mut crate::storage::unified::entity::RowData,
763    field_assignments: I,
764) where
765    I: IntoIterator<Item = (String, Value)>,
766{
767    for (column, value) in field_assignments {
768        set_row_field(row, &column, value);
769    }
770}
771
772fn apply_row_field_assignments_incremental<I>(
773    collection: &str,
774    row: &mut crate::storage::unified::entity::RowData,
775    field_assignments: I,
776    row_contract_plan: Option<&RowUpdateContractPlan>,
777) -> RedDBResult<()>
778where
779    I: IntoIterator<Item = (String, Value)>,
780{
781    for (column, value) in field_assignments {
782        let value = normalize_row_update_assignment_with_plan(
783            collection,
784            &column,
785            value,
786            row_contract_plan,
787        )?;
788
789        set_row_field(row, &column, value);
790    }
791
792    Ok(())
793}
794
795fn resolved_uniqueness_rules(
796    contract: &crate::physical::CollectionContract,
797) -> Vec<UniquenessRule> {
798    let mut rules = Vec::new();
799
800    if let Some(table_def) = &contract.table_def {
801        if !table_def.primary_key.is_empty() {
802            rules.push(UniquenessRule {
803                name: "primary_key".to_string(),
804                columns: table_def.primary_key.clone(),
805                primary_key: true,
806            });
807        }
808
809        for constraint in &table_def.constraints {
810            if matches!(
811                constraint.constraint_type,
812                crate::storage::schema::ConstraintType::PrimaryKey
813            ) && !constraint.columns.is_empty()
814            {
815                rules.push(UniquenessRule {
816                    name: constraint.name.clone(),
817                    columns: constraint.columns.clone(),
818                    primary_key: true,
819                });
820            } else if matches!(
821                constraint.constraint_type,
822                crate::storage::schema::ConstraintType::Unique
823            ) && !constraint.columns.is_empty()
824            {
825                rules.push(UniquenessRule {
826                    name: constraint.name.clone(),
827                    columns: constraint.columns.clone(),
828                    primary_key: false,
829                });
830            }
831        }
832    } else {
833        for column in &contract.declared_columns {
834            if column.primary_key {
835                rules.push(UniquenessRule {
836                    name: format!("pk_{}", column.name),
837                    columns: vec![column.name.clone()],
838                    primary_key: true,
839                });
840            } else if column.unique {
841                rules.push(UniquenessRule {
842                    name: format!("uniq_{}", column.name),
843                    columns: vec![column.name.clone()],
844                    primary_key: false,
845                });
846            }
847        }
848    }
849
850    let mut dedup = std::collections::BTreeSet::new();
851    rules
852        .into_iter()
853        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
854        .collect()
855}
856
857fn row_fields_from_entity(
858    entity: &crate::storage::UnifiedEntity,
859) -> Option<std::collections::BTreeMap<String, Value>> {
860    match &entity.data {
861        crate::storage::EntityData::Row(row) => {
862            if let Some(named) = &row.named {
863                Some(
864                    named
865                        .iter()
866                        .map(|(key, value)| (key.clone(), value.clone()))
867                        .collect(),
868                )
869            } else {
870                row.schema.as_ref().map(|schema| {
871                    schema
872                        .iter()
873                        .cloned()
874                        .zip(row.columns.iter().cloned())
875                        .collect()
876                })
877            }
878        }
879        _ => None,
880    }
881}
882
883fn value_signature(value: &Value) -> String {
884    format!("{value:?}")
885}
886
887fn normalize_contract_value(
888    collection: &str,
889    column: &ResolvedColumnRule,
890    value: Value,
891) -> RedDBResult<Value> {
892    if matches!(value, Value::Null) {
893        if column.not_null {
894            return Err(crate::RedDBError::Query(format!(
895                "column '{}' in collection '{}' cannot be null",
896                column.name, collection
897            )));
898        }
899        return Ok(Value::Null);
900    }
901
902    let target = column.data_type;
903    if value_matches_declared_type(&value, target) {
904        return Ok(value);
905    }
906
907    let Some(raw) = value_to_coercion_input(&value) else {
908        return Err(crate::RedDBError::Query(format!(
909            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
910            column.name, collection, column.data_type_name
911        )));
912    };
913
914    coerce_contract_literal(collection, &column.name, column, &raw)
915}
916
917fn coerce_contract_literal(
918    collection: &str,
919    column_name: &str,
920    column: &ResolvedColumnRule,
921    raw: &str,
922) -> RedDBResult<Value> {
923    let target = column.data_type;
924    match target {
925        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
926        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
927        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
928            crate::RedDBError::Query(format!(
929                "failed to coerce column '{}' in collection '{}' to '{}': {}",
930                column_name, collection, column.data_type_name, err
931            ))
932        }),
933        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
934            crate::RedDBError::Query(format!(
935                "failed to coerce column '{}' in collection '{}' to '{}': {}",
936                column_name, collection, column.data_type_name, err
937            ))
938        }),
939        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
940            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
941            column_name, collection, column.data_type_name
942        ))),
943        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
944            |err| {
945                crate::RedDBError::Query(format!(
946                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
947                    column_name, collection, column.data_type_name, err
948                ))
949            },
950        ),
951    }
952}
953
954struct ResolvedColumnRule {
955    name: String,
956    data_type: DataType,
957    data_type_name: String,
958    not_null: bool,
959    default: Option<String>,
960    enum_variants: Vec<String>,
961}
962
963fn resolved_contract_columns(
964    contract: &crate::physical::CollectionContract,
965) -> RedDBResult<Vec<ResolvedColumnRule>> {
966    if let Some(table_def) = &contract.table_def {
967        return Ok(table_def
968            .columns
969            .iter()
970            .map(|column| ResolvedColumnRule {
971                name: column.name.clone(),
972                data_type: column.data_type,
973                data_type_name: data_type_name(column.data_type).to_string(),
974                not_null: !column.nullable,
975                default: column
976                    .default
977                    .as_ref()
978                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
979                enum_variants: column.enum_variants.clone(),
980            })
981            .collect());
982    }
983
984    contract
985        .declared_columns
986        .iter()
987        .map(|column| {
988            let data_type = column
989                .sql_type
990                .as_ref()
991                .map(crate::storage::query::resolve_sql_type_name)
992                .transpose()
993                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
994                .unwrap_or(parse_declared_data_type(&column.data_type)?);
995            Ok(ResolvedColumnRule {
996                name: column.name.clone(),
997                data_type,
998                data_type_name: column.data_type.clone(),
999                not_null: column.not_null,
1000                default: column.default.clone(),
1001                enum_variants: column.enum_variants.clone(),
1002            })
1003        })
1004        .collect()
1005}
1006
1007fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1008    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1009}
1010
1011fn data_type_name(data_type: DataType) -> &'static str {
1012    match data_type {
1013        DataType::Integer => "integer",
1014        DataType::UnsignedInteger => "unsigned_integer",
1015        DataType::Float => "float",
1016        DataType::Text => "text",
1017        DataType::Blob => "blob",
1018        DataType::Boolean => "boolean",
1019        DataType::Timestamp => "timestamp",
1020        DataType::Duration => "duration",
1021        DataType::IpAddr => "ipaddr",
1022        DataType::MacAddr => "macaddr",
1023        DataType::Vector => "vector",
1024        DataType::Nullable => "nullable",
1025        DataType::Unknown => "unknown",
1026        DataType::Json => "json",
1027        DataType::Uuid => "uuid",
1028        DataType::NodeRef => "noderef",
1029        DataType::EdgeRef => "edgeref",
1030        DataType::VectorRef => "vectorref",
1031        DataType::RowRef => "rowref",
1032        DataType::Color => "color",
1033        DataType::Email => "email",
1034        DataType::Url => "url",
1035        DataType::Phone => "phone",
1036        DataType::Semver => "semver",
1037        DataType::Cidr => "cidr",
1038        DataType::Date => "date",
1039        DataType::Time => "time",
1040        DataType::Decimal => "decimal",
1041        DataType::Enum => "enum",
1042        DataType::Array => "array",
1043        DataType::TimestampMs => "timestamp_ms",
1044        DataType::Ipv4 => "ipv4",
1045        DataType::Ipv6 => "ipv6",
1046        DataType::Subnet => "subnet",
1047        DataType::Port => "port",
1048        DataType::Latitude => "latitude",
1049        DataType::Longitude => "longitude",
1050        DataType::GeoPoint => "geopoint",
1051        DataType::Country2 => "country2",
1052        DataType::Country3 => "country3",
1053        DataType::Lang2 => "lang2",
1054        DataType::Lang5 => "lang5",
1055        DataType::Currency => "currency",
1056        DataType::AssetCode => "asset_code",
1057        DataType::Money => "money",
1058        DataType::ColorAlpha => "color_alpha",
1059        DataType::BigInt => "bigint",
1060        DataType::KeyRef => "keyref",
1061        DataType::DocRef => "docref",
1062        DataType::TableRef => "tableref",
1063        DataType::PageRef => "pageref",
1064        DataType::Secret => "secret",
1065        DataType::Password => "password",
1066        DataType::TextZstd => "text",
1067        DataType::BlobZstd => "blob",
1068    }
1069}
1070
1071fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1072    matches!(
1073        (value, target),
1074        (Value::Null, _)
1075            | (Value::Integer(_), DataType::Integer)
1076            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1077            | (Value::Float(_), DataType::Float)
1078            | (Value::Text(_), DataType::Text)
1079            | (Value::Blob(_), DataType::Blob)
1080            | (Value::Boolean(_), DataType::Boolean)
1081            | (Value::Timestamp(_), DataType::Timestamp)
1082            | (Value::Duration(_), DataType::Duration)
1083            | (Value::IpAddr(_), DataType::IpAddr)
1084            | (Value::MacAddr(_), DataType::MacAddr)
1085            | (Value::Vector(_), DataType::Vector)
1086            | (Value::Json(_), DataType::Json)
1087            | (Value::Uuid(_), DataType::Uuid)
1088            | (Value::NodeRef(_), DataType::NodeRef)
1089            | (Value::EdgeRef(_), DataType::EdgeRef)
1090            | (Value::VectorRef(_, _), DataType::VectorRef)
1091            | (Value::RowRef(_, _), DataType::RowRef)
1092            | (Value::Color(_), DataType::Color)
1093            | (Value::Email(_), DataType::Email)
1094            | (Value::Url(_), DataType::Url)
1095            | (Value::Phone(_), DataType::Phone)
1096            | (Value::Semver(_), DataType::Semver)
1097            | (Value::Cidr(_, _), DataType::Cidr)
1098            | (Value::Date(_), DataType::Date)
1099            | (Value::Time(_), DataType::Time)
1100            | (Value::Decimal(_), DataType::Decimal)
1101            | (Value::EnumValue(_), DataType::Enum)
1102            | (Value::Array(_), DataType::Array)
1103            | (Value::TimestampMs(_), DataType::TimestampMs)
1104            | (Value::Ipv4(_), DataType::Ipv4)
1105            | (Value::Ipv6(_), DataType::Ipv6)
1106            | (Value::Subnet(_, _), DataType::Subnet)
1107            | (Value::Port(_), DataType::Port)
1108            | (Value::Latitude(_), DataType::Latitude)
1109            | (Value::Longitude(_), DataType::Longitude)
1110            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1111            | (Value::Country2(_), DataType::Country2)
1112            | (Value::Country3(_), DataType::Country3)
1113            | (Value::Lang2(_), DataType::Lang2)
1114            | (Value::Lang5(_), DataType::Lang5)
1115            | (Value::Currency(_), DataType::Currency)
1116            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1117            | (Value::BigInt(_), DataType::BigInt)
1118            | (Value::KeyRef(_, _), DataType::KeyRef)
1119            | (Value::DocRef(_, _), DataType::DocRef)
1120            | (Value::TableRef(_), DataType::TableRef)
1121            | (Value::PageRef(_), DataType::PageRef)
1122            | (Value::Secret(_), DataType::Secret)
1123            | (Value::Password(_), DataType::Password)
1124    )
1125}
1126
1127fn value_to_coercion_input(value: &Value) -> Option<String> {
1128    match value {
1129        Value::Null => None,
1130        Value::Integer(value) => Some(value.to_string()),
1131        Value::UnsignedInteger(value) => Some(value.to_string()),
1132        Value::Float(value) => Some(value.to_string()),
1133        Value::Text(value) => Some(value.to_string()),
1134        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1135        Value::Boolean(value) => Some(value.to_string()),
1136        Value::Timestamp(value) => Some(value.to_string()),
1137        Value::Duration(value) => Some(value.to_string()),
1138        Value::IpAddr(value) => Some(value.to_string()),
1139        Value::MacAddr(value) => Some(format!(
1140            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1141            value[0], value[1], value[2], value[3], value[4], value[5]
1142        )),
1143        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1144        Value::Email(value) => Some(value.clone()),
1145        Value::Url(value) => Some(value.clone()),
1146        Value::Phone(value) => Some(value.to_string()),
1147        Value::Semver(value) => Some(format!(
1148            "{}.{}.{}",
1149            value / 1_000_000,
1150            (value / 1_000) % 1_000,
1151            value % 1_000
1152        )),
1153        Value::Date(value) => Some(value.to_string()),
1154        Value::Time(value) => Some(value.to_string()),
1155        Value::Decimal(value) => Some(value.to_string()),
1156        Value::TimestampMs(value) => Some(value.to_string()),
1157        Value::Ipv4(value) => Some(format!(
1158            "{}.{}.{}.{}",
1159            (value >> 24) & 0xFF,
1160            (value >> 16) & 0xFF,
1161            (value >> 8) & 0xFF,
1162            value & 0xFF
1163        )),
1164        Value::Port(value) => Some(value.to_string()),
1165        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1166        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1167        Value::GeoPoint(lat, lon) => Some(format!(
1168            "{},{}",
1169            *lat as f64 / 1_000_000.0,
1170            *lon as f64 / 1_000_000.0
1171        )),
1172        Value::BigInt(value) => Some(value.to_string()),
1173        Value::TableRef(value) => Some(value.clone()),
1174        Value::PageRef(value) => Some(value.to_string()),
1175        Value::Password(value) => Some(value.clone()),
1176        _ => None,
1177    }
1178}
1179
1180fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1181    if modified_columns.is_empty() {
1182        return modified_columns;
1183    }
1184
1185    let mut unique = Vec::with_capacity(modified_columns.len());
1186    for column in modified_columns.drain(..) {
1187        if !unique
1188            .iter()
1189            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1190        {
1191            unique.push(column);
1192        }
1193    }
1194    unique
1195}
1196
1197impl RedDBRuntime {
1198    pub(crate) fn apply_loaded_patch_entity_core(
1199        &self,
1200        collection: String,
1201        mut entity: crate::storage::UnifiedEntity,
1202        payload: JsonValue,
1203        operations: Vec<PatchEntityOperation>,
1204    ) -> RedDBResult<AppliedEntityMutation> {
1205        let id = entity.id;
1206        let operations = normalize_ttl_patch_operations(operations)?;
1207        // Snapshot pre-patch row fields for the secondary-index hook —
1208        // empty for non-row entities, which is the desired no-op.
1209        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1210
1211        let db = self.db();
1212        let store = db.store();
1213        let Some(manager) = store.get_collection(&collection) else {
1214            return Err(crate::RedDBError::NotFound(format!(
1215                "collection not found: {collection}"
1216            )));
1217        };
1218
1219        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1220        let mut metadata_changed = false;
1221        let mut modified_columns: Vec<String> = Vec::new();
1222        let mut context_index_dirty = false;
1223
1224        let row_contract_timestamps = db
1225            .collection_contract(&collection)
1226            .map(|c| c.timestamps_enabled)
1227            .unwrap_or(false);
1228
1229        match &mut entity.data {
1230            crate::storage::EntityData::Row(row) => {
1231                let mut field_ops = Vec::new();
1232                let mut metadata_ops = Vec::new();
1233
1234                for mut op in operations {
1235                    let Some(root) = op.path.first().map(String::as_str) else {
1236                        return Err(crate::RedDBError::Query(
1237                            "patch path cannot be empty".to_string(),
1238                        ));
1239                    };
1240
1241                    match root {
1242                        "fields" | "named" => {
1243                            if op.path.len() < 2 {
1244                                return Err(crate::RedDBError::Query(
1245                                    "patch path 'fields' requires a nested key".to_string(),
1246                                ));
1247                            }
1248                            if row_contract_timestamps {
1249                                let leaf = op.path.get(1).map(String::as_str);
1250                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1251                                    return Err(crate::RedDBError::Query(format!(
1252                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1253                                        collection,
1254                                        leaf.unwrap_or("")
1255                                    )));
1256                                }
1257                            }
1258                            op.path.remove(0);
1259                            field_ops.push(op);
1260                        }
1261                        "metadata" => {
1262                            if op.path.len() < 2 {
1263                                return Err(crate::RedDBError::Query(
1264                                    "patch path 'metadata' requires a nested key".to_string(),
1265                                ));
1266                            }
1267                            op.path.remove(0);
1268                            metadata_ops.push(op);
1269                        }
1270                        _ => {
1271                            return Err(crate::RedDBError::Query(format!(
1272                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1273                            )));
1274                        }
1275                    }
1276                }
1277
1278                if !field_ops.is_empty() {
1279                    context_index_dirty = true;
1280                    for op in &field_ops {
1281                        if let Some(col) = op.path.first() {
1282                            modified_columns.push(col.clone());
1283                        }
1284                    }
1285                    let named = row.named.get_or_insert_with(Default::default);
1286                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1287                }
1288
1289                if let Some(fields) = payload
1290                    .get("fields")
1291                    .and_then(crate::json::Value::as_object)
1292                {
1293                    if row_contract_timestamps {
1294                        for key in fields.keys() {
1295                            if key == "created_at" || key == "updated_at" {
1296                                return Err(crate::RedDBError::Query(format!(
1297                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1298                                    collection, key
1299                                )));
1300                            }
1301                        }
1302                    }
1303                    context_index_dirty = true;
1304                    let named = row.named.get_or_insert_with(Default::default);
1305                    for (key, value) in fields {
1306                        modified_columns.push(key.clone());
1307                        named.insert(key.clone(), json_to_storage_value(value)?);
1308                    }
1309                }
1310
1311                if !metadata_ops.is_empty() {
1312                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1313                    let metadata = patch_metadata.get_or_insert_with(|| {
1314                        store.get_metadata(&collection, id).unwrap_or_default()
1315                    });
1316                    let mut metadata_json = metadata_to_json(metadata);
1317                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1318                        .map_err(crate::RedDBError::Query)?;
1319                    *metadata = metadata_from_json(&metadata_json)?;
1320                    metadata_changed = true;
1321                }
1322
1323                if !modified_columns.is_empty() || row_contract_timestamps {
1324                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1325                    let current_fields = if let Some(named) = row.named.take() {
1326                        named.into_iter().collect::<Vec<_>>()
1327                    } else if let Some(schema) = row.schema.as_ref() {
1328                        schema
1329                            .iter()
1330                            .cloned()
1331                            .zip(row.columns.iter().cloned())
1332                            .collect::<Vec<_>>()
1333                    } else {
1334                        Vec::new()
1335                    };
1336                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1337                    if row_contract_timestamps {
1338                        modified_columns.push("updated_at".to_string());
1339                        context_index_dirty = true;
1340                    }
1341                    if contract.requires_uniqueness_check(&modified_columns) {
1342                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1343                    }
1344                    row.named = Some(normalized_fields.into_iter().collect());
1345                }
1346            }
1347            crate::storage::EntityData::Node(node) => {
1348                let mut field_ops = Vec::new();
1349                let mut metadata_ops = Vec::new();
1350
1351                for mut op in operations {
1352                    let Some(root) = op.path.first().map(String::as_str) else {
1353                        return Err(crate::RedDBError::Query(
1354                            "patch path cannot be empty".to_string(),
1355                        ));
1356                    };
1357
1358                    match root {
1359                        "fields" | "properties" => {
1360                            if op.path.len() < 2 {
1361                                return Err(crate::RedDBError::Query(
1362                                    "patch path 'fields' requires a nested key".to_string(),
1363                                ));
1364                            }
1365                            op.path.remove(0);
1366                            field_ops.push(op);
1367                        }
1368                        "metadata" => {
1369                            if op.path.len() < 2 {
1370                                return Err(crate::RedDBError::Query(
1371                                    "patch path 'metadata' requires a nested key".to_string(),
1372                                ));
1373                            }
1374                            op.path.remove(0);
1375                            metadata_ops.push(op);
1376                        }
1377                        _ => {
1378                            return Err(crate::RedDBError::Query(format!(
1379                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, or metadata/*"
1380                            )));
1381                        }
1382                    }
1383                }
1384
1385                if !field_ops.is_empty() {
1386                    context_index_dirty = true;
1387                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1388                }
1389
1390                if let Some(fields) = payload
1391                    .get("fields")
1392                    .and_then(crate::json::Value::as_object)
1393                {
1394                    context_index_dirty = true;
1395                    for (key, value) in fields {
1396                        node.properties
1397                            .insert(key.clone(), json_to_storage_value(value)?);
1398                    }
1399                }
1400
1401                if !metadata_ops.is_empty() {
1402                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1403                    let metadata = patch_metadata.get_or_insert_with(|| {
1404                        store.get_metadata(&collection, id).unwrap_or_default()
1405                    });
1406                    let mut metadata_json = metadata_to_json(metadata);
1407                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1408                        .map_err(crate::RedDBError::Query)?;
1409                    *metadata = metadata_from_json(&metadata_json)?;
1410                    metadata_changed = true;
1411                }
1412            }
1413            crate::storage::EntityData::Edge(edge) => {
1414                let mut field_ops = Vec::new();
1415                let mut metadata_ops = Vec::new();
1416                let mut weight_ops = Vec::new();
1417
1418                for mut op in operations {
1419                    let Some(root) = op.path.first().map(String::as_str) else {
1420                        return Err(crate::RedDBError::Query(
1421                            "patch path cannot be empty".to_string(),
1422                        ));
1423                    };
1424
1425                    match root {
1426                        "fields" | "properties" => {
1427                            if op.path.len() < 2 {
1428                                return Err(crate::RedDBError::Query(
1429                                    "patch path 'fields' requires a nested key".to_string(),
1430                                ));
1431                            }
1432                            op.path.remove(0);
1433                            field_ops.push(op);
1434                        }
1435                        "weight" => {
1436                            if op.path.len() != 1 {
1437                                return Err(crate::RedDBError::Query(
1438                                    "patch path 'weight' does not allow nested keys".to_string(),
1439                                ));
1440                            }
1441                            op.path.clear();
1442                            weight_ops.push(op);
1443                        }
1444                        "metadata" => {
1445                            if op.path.len() < 2 {
1446                                return Err(crate::RedDBError::Query(
1447                                    "patch path 'metadata' requires a nested key".to_string(),
1448                                ));
1449                            }
1450                            op.path.remove(0);
1451                            metadata_ops.push(op);
1452                        }
1453                        _ => {
1454                            return Err(crate::RedDBError::Query(format!(
1455                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1456                            )));
1457                        }
1458                    }
1459                }
1460
1461                if !field_ops.is_empty() {
1462                    context_index_dirty = true;
1463                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1464                }
1465
1466                for op in weight_ops {
1467                    context_index_dirty = true;
1468                    let value = op.value.ok_or_else(|| {
1469                        crate::RedDBError::Query("weight operations require a value".to_string())
1470                    })?;
1471
1472                    match op.op {
1473                        PatchEntityOperationType::Unset => {
1474                            return Err(crate::RedDBError::Query(
1475                                "weight cannot be unset through patch operations".to_string(),
1476                            ));
1477                        }
1478                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1479                            let Some(weight) = value.as_f64() else {
1480                                return Err(crate::RedDBError::Query(
1481                                    "weight operation requires a numeric value".to_string(),
1482                                ));
1483                            };
1484                            edge.weight = weight as f32;
1485                        }
1486                    }
1487                }
1488
1489                if let Some(fields) = payload
1490                    .get("fields")
1491                    .and_then(crate::json::Value::as_object)
1492                {
1493                    context_index_dirty = true;
1494                    for (key, value) in fields {
1495                        edge.properties
1496                            .insert(key.clone(), json_to_storage_value(value)?);
1497                    }
1498                }
1499
1500                if !metadata_ops.is_empty() {
1501                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1502                    let metadata = patch_metadata.get_or_insert_with(|| {
1503                        store.get_metadata(&collection, id).unwrap_or_default()
1504                    });
1505                    let mut metadata_json = metadata_to_json(metadata);
1506                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1507                        .map_err(crate::RedDBError::Query)?;
1508                    *metadata = metadata_from_json(&metadata_json)?;
1509                    metadata_changed = true;
1510                }
1511            }
1512            crate::storage::EntityData::Vector(vector) => {
1513                let mut field_ops = Vec::new();
1514                let mut metadata_ops = Vec::new();
1515
1516                for mut op in operations {
1517                    let Some(root) = op.path.first().map(String::as_str) else {
1518                        return Err(crate::RedDBError::Query(
1519                            "patch path cannot be empty".to_string(),
1520                        ));
1521                    };
1522
1523                    match root {
1524                        "fields" => {
1525                            if op.path.len() < 2 {
1526                                return Err(crate::RedDBError::Query(
1527                                    "patch path 'fields' requires a nested key".to_string(),
1528                                ));
1529                            }
1530                            op.path.remove(0);
1531                            let Some(target) = op.path.first().map(String::as_str) else {
1532                                return Err(crate::RedDBError::Query(
1533                                    "patch path requires a target under fields".to_string(),
1534                                ));
1535                            };
1536                            if !matches!(target, "dense" | "content" | "sparse") {
1537                                return Err(crate::RedDBError::Query(format!(
1538                                    "unsupported vector patch target '{target}'"
1539                                )));
1540                            }
1541                            field_ops.push(op);
1542                        }
1543                        "metadata" => {
1544                            if op.path.len() < 2 {
1545                                return Err(crate::RedDBError::Query(
1546                                    "patch path 'metadata' requires a nested key".to_string(),
1547                                ));
1548                            }
1549                            op.path.remove(0);
1550                            metadata_ops.push(op);
1551                        }
1552                        _ => {
1553                            return Err(crate::RedDBError::Query(format!(
1554                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1555                            )));
1556                        }
1557                    }
1558                }
1559
1560                if !field_ops.is_empty() {
1561                    context_index_dirty = true;
1562                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1563                }
1564
1565                if let Some(fields) = payload
1566                    .get("fields")
1567                    .and_then(crate::json::Value::as_object)
1568                {
1569                    context_index_dirty = true;
1570                    if let Some(content) =
1571                        fields.get("content").and_then(crate::json::Value::as_str)
1572                    {
1573                        vector.content = Some(content.to_string());
1574                    }
1575                    if let Some(dense) = fields.get("dense") {
1576                        vector.dense = dense
1577                            .as_array()
1578                            .ok_or_else(|| {
1579                                crate::RedDBError::Query(
1580                                    "field 'dense' must be an array".to_string(),
1581                                )
1582                            })?
1583                            .iter()
1584                            .map(|value| {
1585                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1586                                    crate::RedDBError::Query(
1587                                        "field 'dense' must contain only numbers".to_string(),
1588                                    )
1589                                })
1590                            })
1591                            .collect::<Result<Vec<_>, _>>()?;
1592                    }
1593                }
1594
1595                if !metadata_ops.is_empty() {
1596                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1597                    let metadata = patch_metadata.get_or_insert_with(|| {
1598                        store.get_metadata(&collection, id).unwrap_or_default()
1599                    });
1600                    let mut metadata_json = metadata_to_json(metadata);
1601                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1602                        .map_err(crate::RedDBError::Query)?;
1603                    *metadata = metadata_from_json(&metadata_json)?;
1604                    metadata_changed = true;
1605                }
1606            }
1607            crate::storage::EntityData::TimeSeries(_)
1608            | crate::storage::EntityData::QueueMessage(_) => {
1609                return Err(crate::RedDBError::Query(
1610                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1611                        .to_string(),
1612                ));
1613            }
1614        }
1615
1616        if let Some(metadata) = payload
1617            .get("metadata")
1618            .and_then(crate::json::Value::as_object)
1619        {
1620            let patch_metadata = patch_metadata
1621                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1622            for (key, value) in metadata {
1623                ensure_non_tree_reserved_metadata_key(key)?;
1624                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1625            }
1626            metadata_changed = true;
1627        }
1628
1629        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1630            let patch_metadata = patch_metadata
1631                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1632            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1633                patch_metadata.remove(&key);
1634            } else {
1635                patch_metadata.set(key, value);
1636            }
1637            metadata_changed = true;
1638        }
1639
1640        entity.updated_at = std::time::SystemTime::now()
1641            .duration_since(std::time::UNIX_EPOCH)
1642            .unwrap_or_default()
1643            .as_secs();
1644
1645        modified_columns = dedupe_modified_columns(modified_columns);
1646
1647        Ok(AppliedEntityMutation {
1648            id,
1649            collection,
1650            entity,
1651            metadata: patch_metadata,
1652            modified_columns,
1653            persist_metadata: metadata_changed,
1654            context_index_dirty,
1655            replaced_entity: None,
1656            replaced_entity_previous_xmax: 0,
1657            pre_mutation_fields,
1658        })
1659    }
1660
1661    pub(crate) fn apply_loaded_sql_update_row_core(
1662        &self,
1663        collection: String,
1664        mut entity: crate::storage::UnifiedEntity,
1665        static_field_assignments: &[(String, Value)],
1666        dynamic_field_assignments: Vec<(String, Value)>,
1667        static_metadata_assignments: &[(String, MetadataValue)],
1668        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1669        row_contract_plan: Option<&RowUpdateContractPlan>,
1670        row_modified_columns_template: &[String],
1671        row_touches_unique_columns: bool,
1672    ) -> RedDBResult<AppliedEntityMutation> {
1673        let id = entity.id;
1674        let previous_xmax = entity.xmax;
1675        let db = self.db();
1676        let store = db.store();
1677        let Some(_) = store.get_collection(&collection) else {
1678            return Err(crate::RedDBError::NotFound(format!(
1679                "collection not found: {collection}"
1680            )));
1681        };
1682
1683        let versioned_update_xid = match self.current_xid() {
1684            Some(xid) => Some(xid),
1685            None => {
1686                let snapshot_manager = self.snapshot_manager();
1687                let xid = snapshot_manager.begin();
1688                snapshot_manager.commit(xid);
1689                Some(xid)
1690            }
1691        };
1692        let mut replaced_entity = versioned_update_xid.map(|xid| {
1693            let mut old = entity.clone();
1694            old.set_xmax(xid);
1695            old
1696        });
1697
1698        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1699        let row_contract_timestamps = row_contract_plan
1700            .map(|plan| plan.timestamps_enabled)
1701            .unwrap_or(false);
1702        let mut metadata_changed = false;
1703        let mut modified_columns = row_modified_columns_template.to_vec();
1704        let mut context_index_dirty = !modified_columns.is_empty();
1705
1706        // Snapshot OLD field values BEFORE applying the assignments —
1707        // the secondary-index maintenance hook needs both before/after to
1708        // delete-then-insert under changed indexed columns.
1709        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1710
1711        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1712            return Err(crate::RedDBError::Query(
1713                "SQL row update fast path requires a row entity".to_string(),
1714            ));
1715        };
1716
1717        let _ = row_contract_plan;
1718        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1719        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1720
1721        for (key, value) in static_metadata_assignments
1722            .iter()
1723            .cloned()
1724            .chain(dynamic_metadata_assignments)
1725        {
1726            ensure_non_tree_reserved_metadata_key(&key)?;
1727            patch_metadata
1728                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1729                .set(key, value);
1730            metadata_changed = true;
1731        }
1732
1733        if !modified_columns.is_empty() || row_contract_timestamps {
1734            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1735            if row_contract_timestamps {
1736                context_index_dirty = true;
1737                set_row_field(
1738                    row,
1739                    "updated_at",
1740                    Value::UnsignedInteger(current_unix_ms_u64()),
1741                );
1742                modified_columns.push("updated_at".to_string());
1743            }
1744            if row_touches_unique_columns {
1745                let current_fields = collect_row_fields(row);
1746                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1747            }
1748        }
1749
1750        entity.updated_at = std::time::SystemTime::now()
1751            .duration_since(std::time::UNIX_EPOCH)
1752            .unwrap_or_default()
1753            .as_secs();
1754
1755        if let Some(xid) = versioned_update_xid {
1756            let logical_id = entity.logical_id();
1757            entity.id = store.next_entity_id();
1758            entity.set_logical_id(logical_id);
1759            entity.set_xmin(xid);
1760            entity.set_xmax(0);
1761            if let Some(old) = replaced_entity.as_mut() {
1762                old.set_xmax(xid);
1763            }
1764        }
1765
1766        modified_columns = dedupe_modified_columns(modified_columns);
1767
1768        Ok(AppliedEntityMutation {
1769            id: entity.id,
1770            collection,
1771            entity,
1772            metadata: patch_metadata,
1773            modified_columns,
1774            persist_metadata: metadata_changed,
1775            context_index_dirty,
1776            replaced_entity,
1777            replaced_entity_previous_xmax: previous_xmax,
1778            pre_mutation_fields,
1779        })
1780    }
1781
1782    pub(crate) fn persist_applied_entity_mutations(
1783        &self,
1784        applied: &[AppliedEntityMutation],
1785    ) -> RedDBResult<()> {
1786        if applied.is_empty() {
1787            return Ok(());
1788        }
1789
1790        let store = self.db().store();
1791        let collection = &applied[0].collection;
1792        let Some(manager) = store.get_collection(collection) else {
1793            return Err(crate::RedDBError::NotFound(format!(
1794                "collection not found: {collection}"
1795            )));
1796        };
1797
1798        let mut ordinary = Vec::with_capacity(applied.len());
1799        for item in applied {
1800            if let Some(old_version) = item.replaced_entity.as_ref() {
1801                store
1802                    .install_versioned_table_row_update(
1803                        collection,
1804                        old_version.clone(),
1805                        item.entity.clone(),
1806                        if item.persist_metadata {
1807                            item.metadata.as_ref()
1808                        } else {
1809                            None
1810                        },
1811                    )
1812                    .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1813                if self.current_xid().is_some() {
1814                    self.record_pending_versioned_update(
1815                        crate::runtime::impl_core::current_connection_id(),
1816                        collection,
1817                        old_version.id,
1818                        item.entity.id,
1819                        old_version.xmax,
1820                        item.replaced_entity_previous_xmax,
1821                    );
1822                }
1823            } else {
1824                ordinary.push(item);
1825            }
1826        }
1827        if ordinary.is_empty() {
1828            return Ok(());
1829        }
1830
1831        manager
1832            .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1833                (
1834                    &item.entity,
1835                    item.modified_columns.as_slice(),
1836                    if item.persist_metadata {
1837                        item.metadata.as_ref()
1838                    } else {
1839                        None
1840                    },
1841                )
1842            }))
1843            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1844
1845        // PG-HOT-like fast path: segment in-place is done; when no
1846        // mutation touches a secondary-indexed column AND no metadata
1847        // payload needs to be folded into a B-tree record, skip the
1848        // in-line B-tree upsert. The WAL still records the update
1849        // (durability preserved; recovery replay rebuilds the B-tree),
1850        // and `manager.get()` prefers the live segment over the
1851        // B-tree for reads — so the short-circuit is invisible to
1852        // callers. See `persist_entities_to_pager_wal_only`.
1853        let indexed_cols = self
1854            .index_store_ref()
1855            .indexed_columns_set(collection.as_str());
1856        let all_hot = !indexed_cols.is_empty()
1857            && ordinary.iter().all(|item| {
1858                !item.persist_metadata
1859                    && !item
1860                        .modified_columns
1861                        .iter()
1862                        .any(|c| indexed_cols.contains(c))
1863            })
1864            || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
1865
1866        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
1867        // inner loop hands us `applied` which already owns the post-image
1868        // entity; all we need for the persist path is a read borrow.
1869        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
1870            ordinary.iter().map(|item| &item.entity).collect();
1871        let persist_fn = if all_hot {
1872            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
1873        } else {
1874            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
1875        };
1876        persist_fn(store.as_ref(), collection, &entity_refs)
1877            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
1878    }
1879
1880    pub(crate) fn flush_applied_entity_mutation(
1881        &self,
1882        applied: &AppliedEntityMutation,
1883    ) -> RedDBResult<()> {
1884        let store = self.db().store();
1885        if applied.context_index_dirty {
1886            store
1887                .context_index()
1888                .index_entity(&applied.collection, &applied.entity);
1889        }
1890        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
1891        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
1892        // didn't carry recoverable column names) — there's nothing to
1893        // delete-then-insert in that case.
1894        //
1895        // Also build the CDC damage vector here so downstream consumers
1896        // see which columns changed without re-diffing.
1897        let mut changed_columns: Option<Vec<String>> = None;
1898        if !applied.pre_mutation_fields.is_empty() {
1899            let post = entity_row_fields_snapshot(&applied.entity);
1900            if !post.is_empty() {
1901                let damage = crate::application::entity::row_damage_vector(
1902                    &applied.pre_mutation_fields,
1903                    &post,
1904                );
1905                if !damage.is_empty() {
1906                    changed_columns = Some(
1907                        damage
1908                            .touched_columns()
1909                            .into_iter()
1910                            .map(str::to_string)
1911                            .collect(),
1912                    );
1913                }
1914
1915                // HOT-like fast path (P3.T2/T3): when no modified
1916                // column is covered by a secondary index, skip the
1917                // `index_entity_update` call entirely. The function
1918                // would short-circuit internally, but the call still
1919                // reads the registry lock + walks the damage vector
1920                // — avoiding it saves a few microseconds per UPDATE.
1921                // Page-local replace + t_ctid chain support (true
1922                // HOT) lives in a follow-up storage spec.
1923                let indexed_cols: std::collections::HashSet<String> = self
1924                    .index_store_ref()
1925                    .list_indices(applied.collection.as_str())
1926                    .into_iter()
1927                    .filter_map(|idx| idx.columns.first().cloned())
1928                    .collect();
1929                let modified_cols: std::collections::HashSet<String> = damage
1930                    .touched_columns()
1931                    .into_iter()
1932                    .map(str::to_string)
1933                    .collect();
1934                if let Some(old_version) = applied.replaced_entity.as_ref() {
1935                    let old_index_fields: Vec<(String, Value)> = applied
1936                        .pre_mutation_fields
1937                        .iter()
1938                        .filter(|(col, _)| indexed_cols.contains(col))
1939                        .cloned()
1940                        .collect();
1941                    let new_index_fields: Vec<(String, Value)> = post
1942                        .iter()
1943                        .filter(|(col, _)| indexed_cols.contains(col))
1944                        .cloned()
1945                        .collect();
1946                    if !old_index_fields.is_empty() {
1947                        self.index_store_ref()
1948                            .index_entity_delete(
1949                                &applied.collection,
1950                                old_version.id,
1951                                &old_index_fields,
1952                            )
1953                            .map_err(crate::RedDBError::Internal)?;
1954                    }
1955                    if !new_index_fields.is_empty() {
1956                        self.index_store_ref()
1957                            .index_entity_insert(
1958                                &applied.collection,
1959                                applied.entity.id,
1960                                &new_index_fields,
1961                            )
1962                            .map_err(crate::RedDBError::Internal)?;
1963                    }
1964                } else {
1965                    let decision = crate::storage::engine::hot_update::decide(
1966                        &crate::storage::engine::hot_update::HotUpdateInputs {
1967                            collection: applied.collection.as_str(),
1968                            indexed_columns: &indexed_cols,
1969                            modified_columns: &modified_cols,
1970                            // The storage layer currently handles fit via
1971                            // the segment abstraction; we bypass the
1972                            // page-size check here.
1973                            new_tuple_size: 0,
1974                            page_free_space: usize::MAX,
1975                        },
1976                    );
1977                    if !decision.can_hot {
1978                        self.index_store_ref()
1979                            .index_entity_update(
1980                                &applied.collection,
1981                                applied.id,
1982                                &applied.pre_mutation_fields,
1983                                &post,
1984                            )
1985                            .map_err(crate::RedDBError::Internal)?;
1986                    } else {
1987                        // F-04: `applied.collection` is tenant-supplied;
1988                        // strip CR/LF/control bytes via the LogField
1989                        // escaper (ADR 0010).
1990                        tracing::debug!(
1991                            collection = %reddb_wire::audit_safe_log_field(&applied.collection),
1992                            "hot_update fast-path: skipped index_entity_update"
1993                        );
1994                    }
1995                }
1996            }
1997        }
1998        self.cdc_emit_prebuilt_with_columns(
1999            crate::replication::cdc::ChangeOperation::Update,
2000            &applied.collection,
2001            &applied.entity,
2002            "entity",
2003            applied.metadata.as_ref(),
2004            true,
2005            changed_columns,
2006        );
2007        Ok(())
2008    }
2009
2010    pub(crate) fn apply_loaded_patch_entity(
2011        &self,
2012        collection: String,
2013        entity: crate::storage::UnifiedEntity,
2014        payload: JsonValue,
2015        operations: Vec<PatchEntityOperation>,
2016    ) -> RedDBResult<CreateEntityOutput> {
2017        let applied =
2018            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2019        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2020        self.flush_applied_entity_mutation(&applied)?;
2021        Ok(CreateEntityOutput {
2022            id: applied.id,
2023            entity: Some(applied.entity),
2024        })
2025    }
2026}
2027
2028fn ensure_non_tree_reserved_metadata_patch_paths(
2029    operations: &[PatchEntityOperation],
2030) -> RedDBResult<()> {
2031    for operation in operations {
2032        let Some(key) = operation.path.first().map(String::as_str) else {
2033            continue;
2034        };
2035        ensure_non_tree_reserved_metadata_key(key)?;
2036    }
2037    Ok(())
2038}
2039
2040fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2041    if key.starts_with(TREE_METADATA_PREFIX) {
2042        return Err(crate::RedDBError::Query(format!(
2043            "metadata key '{}' is reserved for managed trees",
2044            key
2045        )));
2046    }
2047    Ok(())
2048}
2049
2050fn ensure_non_tree_reserved_metadata_entries(
2051    metadata: &[(String, MetadataValue)],
2052) -> RedDBResult<()> {
2053    for (key, _) in metadata {
2054        ensure_non_tree_reserved_metadata_key(key)?;
2055    }
2056    Ok(())
2057}
2058
2059fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2060    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2061        return Err(crate::RedDBError::Query(format!(
2062            "edge label '{}' is reserved for managed trees",
2063            TREE_CHILD_EDGE_LABEL
2064        )));
2065    }
2066    Ok(())
2067}
2068
2069impl RedDBRuntime {
2070    pub(crate) fn create_node_unchecked(
2071        &self,
2072        input: CreateNodeInput,
2073    ) -> RedDBResult<CreateEntityOutput> {
2074        let db = self.db();
2075        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2076        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2077        let mut metadata = input.metadata;
2078        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2079        let mut builder = db.node(&input.collection, &input.label);
2080
2081        if let Some(node_type) = input.node_type {
2082            builder = builder.node_type(node_type);
2083        }
2084
2085        for (key, value) in input.properties {
2086            builder = builder.property(key, value);
2087        }
2088
2089        for (key, value) in metadata {
2090            builder = builder.metadata(key, value);
2091        }
2092
2093        for embedding in input.embeddings {
2094            if let Some(model) = embedding.model {
2095                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2096            } else {
2097                builder = builder.embedding(embedding.name, embedding.vector);
2098            }
2099        }
2100
2101        for link in input.table_links {
2102            builder = builder.link_to_table(link.key, link.table);
2103        }
2104
2105        for link in input.node_links {
2106            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2107        }
2108
2109        let id = builder.save()?;
2110        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
2111        // don't see this node until the transaction commits.
2112        self.stamp_xmin_if_in_txn(&input.collection, id);
2113        refresh_context_index(&db, &input.collection, id)?;
2114        self.cdc_emit(
2115            crate::replication::cdc::ChangeOperation::Insert,
2116            &input.collection,
2117            id.raw(),
2118            "graph_node",
2119        );
2120        Ok(CreateEntityOutput {
2121            id,
2122            entity: db.store().get(&input.collection, id),
2123        })
2124    }
2125
2126    pub(crate) fn create_edge_unchecked(
2127        &self,
2128        input: CreateEdgeInput,
2129    ) -> RedDBResult<CreateEntityOutput> {
2130        let db = self.db();
2131        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2132        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2133        let mut metadata = input.metadata;
2134        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2135        let mut builder = db
2136            .edge(&input.collection, &input.label)
2137            .from(input.from)
2138            .to(input.to);
2139
2140        if let Some(weight) = input.weight {
2141            builder = builder.weight(weight);
2142        }
2143
2144        for (key, value) in input.properties {
2145            builder = builder.property(key, value);
2146        }
2147
2148        for (key, value) in metadata {
2149            builder = builder.metadata(key, value);
2150        }
2151
2152        let id = builder.save()?;
2153        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2154        // sessions don't follow it until COMMIT.
2155        self.stamp_xmin_if_in_txn(&input.collection, id);
2156        refresh_context_index(&db, &input.collection, id)?;
2157        self.cdc_emit(
2158            crate::replication::cdc::ChangeOperation::Insert,
2159            &input.collection,
2160            id.raw(),
2161            "graph_edge",
2162        );
2163        Ok(CreateEntityOutput {
2164            id,
2165            entity: db.store().get(&input.collection, id),
2166        })
2167    }
2168}
2169
2170fn create_rows_batch_prevalidated_columnar_with_outputs(
2171    runtime: &RedDBRuntime,
2172    collection: String,
2173    column_names: std::sync::Arc<Vec<String>>,
2174    rows: Vec<Vec<crate::storage::schema::Value>>,
2175) -> RedDBResult<Vec<CreateEntityOutput>> {
2176    use crate::storage::{
2177        unified::{EntityData, EntityKind, RowData},
2178        EntityId, UnifiedEntity,
2179    };
2180    use std::sync::Arc;
2181
2182    if rows.is_empty() {
2183        return Ok(Vec::new());
2184    }
2185    runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2186    runtime.check_batch_size(rows.len())?;
2187    runtime.check_db_size()?;
2188
2189    let db = runtime.db();
2190    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2191    contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2192
2193    let store = db.store();
2194    let table_arc: Arc<str> = Arc::from(collection.as_str());
2195
2196    let indexed_cols = runtime
2197        .index_store_ref()
2198        .indexed_columns_set(collection.as_str());
2199    let has_secondary_indexes = !indexed_cols.is_empty();
2200    let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2201        if has_secondary_indexes {
2202            Vec::with_capacity(rows.len())
2203        } else {
2204            Vec::new()
2205        };
2206
2207    let entities: Vec<UnifiedEntity> = rows
2208        .into_iter()
2209        .map(|values| {
2210            if has_secondary_indexes {
2211                let mut snap: Vec<(String, crate::storage::schema::Value)> =
2212                    Vec::with_capacity(indexed_cols.len());
2213                for (name, val) in column_names.iter().zip(values.iter()) {
2214                    if indexed_cols.contains(name) {
2215                        snap.push((name.clone(), val.clone()));
2216                    }
2217                }
2218                field_snapshots.push(snap);
2219            }
2220            let mut row = RowData::new(values);
2221            row.schema = Some(Arc::clone(&column_names));
2222            UnifiedEntity::new(
2223                EntityId::new(0),
2224                EntityKind::TableRow {
2225                    table: Arc::clone(&table_arc),
2226                    row_id: 0,
2227                },
2228                EntityData::Row(row),
2229            )
2230        })
2231        .collect();
2232
2233    let ids = store
2234        .bulk_insert(&collection, entities)
2235        .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2236
2237    if has_secondary_indexes {
2238        let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2239            .iter()
2240            .zip(field_snapshots)
2241            .map(|(id, fields)| (*id, fields))
2242            .collect();
2243        runtime
2244            .index_store_ref()
2245            .index_entity_insert_batch(&collection, &index_rows)
2246            .map_err(crate::RedDBError::Internal)?;
2247    }
2248
2249    runtime.invalidate_result_cache();
2250    runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2251
2252    Ok(ids
2253        .into_iter()
2254        .map(|id| CreateEntityOutput { id, entity: None })
2255        .collect())
2256}
2257
2258impl RuntimeEntityPort for RedDBRuntime {
2259    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2260        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2261        let db = self.db();
2262        let CreateRowInput {
2263            collection,
2264            fields,
2265            metadata: input_metadata,
2266            node_links,
2267            vector_links,
2268        } = input;
2269        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2270        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2271        let mut metadata = input_metadata;
2272        apply_collection_default_ttl(&db, &collection, &mut metadata);
2273        let fields = contract.normalize_insert_fields(fields)?;
2274        contract.enforce_row_uniqueness(&fields, None)?;
2275        // Route through MutationEngine for unified hot path.
2276        let engine = self.mutation_engine();
2277        let result = engine.apply(
2278            collection.clone(),
2279            vec![crate::runtime::mutation::MutationRow {
2280                fields,
2281                metadata,
2282                node_links,
2283                vector_links,
2284            }],
2285        )?;
2286        let id = result.ids[0];
2287        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2288        // that also takes a write lock on the entity cache. We know
2289        // the collection — hit the manager directly. Cuts
2290        // create_row() p50 roughly in half on the hot path.
2291        Ok(CreateEntityOutput {
2292            id,
2293            entity: db.store().get(&collection, id),
2294        })
2295    }
2296
2297    fn create_rows_batch(
2298        &self,
2299        input: CreateRowsBatchInput,
2300    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2301        if input.rows.is_empty() {
2302            return Ok(Vec::new());
2303        }
2304        self.check_batch_size(input.rows.len())?;
2305        self.check_db_size()?;
2306
2307        let db = self.db();
2308        let collection = input.collection;
2309        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2310        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2311
2312        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2313        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2314        for row in input.rows {
2315            if row.collection != collection {
2316                return Err(crate::RedDBError::Query(format!(
2317                    "batch row collection mismatch: expected '{}', got '{}'",
2318                    collection, row.collection
2319                )));
2320            }
2321
2322            let mut metadata = row.metadata;
2323            apply_collection_default_ttl(&db, &collection, &mut metadata);
2324            let fields = contract.normalize_insert_fields(row.fields)?;
2325            contract.enforce_row_uniqueness(&fields, None)?;
2326            uniqueness_rows.push(fields.clone());
2327            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2328        }
2329
2330        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2331
2332        // Route through MutationEngine: single bulk_insert + one CDC batch
2333        // instead of N separate cdc_emit() calls (each acquires a write lock).
2334        let engine = {
2335            let e = self.mutation_engine();
2336            if input.suppress_events {
2337                e.with_suppress_events()
2338            } else {
2339                e
2340            }
2341        };
2342        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2343            .into_iter()
2344            .map(|(fields, metadata, node_links, vector_links)| {
2345                crate::runtime::mutation::MutationRow {
2346                    fields,
2347                    metadata,
2348                    node_links,
2349                    vector_links,
2350                }
2351            })
2352            .collect();
2353
2354        let result = engine
2355            .apply(collection.clone(), mutation_rows)
2356            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2357
2358        let store = db.store();
2359        Ok(result
2360            .ids
2361            .into_iter()
2362            .map(|id| CreateEntityOutput {
2363                id,
2364                entity: store.get(&collection, id),
2365            })
2366            .collect())
2367    }
2368
2369    fn create_rows_batch_prevalidated_columnar(
2370        &self,
2371        collection: String,
2372        column_names: std::sync::Arc<Vec<String>>,
2373        rows: Vec<Vec<crate::storage::schema::Value>>,
2374    ) -> RedDBResult<usize> {
2375        create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2376            .map(|outputs| outputs.len())
2377    }
2378
2379    fn create_rows_batch_columnar(
2380        &self,
2381        collection: String,
2382        column_names: std::sync::Arc<Vec<String>>,
2383        rows: Vec<Vec<crate::storage::schema::Value>>,
2384    ) -> RedDBResult<usize> {
2385        self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2386            .map(|outputs| outputs.len())
2387    }
2388
2389    fn create_rows_batch_columnar_with_outputs(
2390        &self,
2391        collection: String,
2392        column_names: std::sync::Arc<Vec<String>>,
2393        rows: Vec<Vec<crate::storage::schema::Value>>,
2394    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2395        if rows.is_empty() {
2396            return Ok(Vec::new());
2397        }
2398        self.check_batch_size(rows.len())?;
2399        self.check_db_size()?;
2400
2401        let db = self.db();
2402        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2403        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2404
2405        // Fast path: when the collection carries no contract (or the
2406        // contract has no declared columns) `normalize_insert_fields`
2407        // is a no-op and `enforce_row_uniqueness` finds no unique
2408        // columns to check. Skip the per-row `(String, Value)` tuple
2409        // materialisation entirely and route through the prevalidated
2410        // columnar kernel — same write, CDC, and index path, just
2411        // without the wasted (String, Value) clones. This is the
2412        // bench `bench_users` shape (no contract declared by the
2413        // adapter's `setup_schema`).
2414        let needs_normalisation = match db.collection_contract(&collection) {
2415            Some(c) => {
2416                c.declared_model == crate::catalog::CollectionModel::Table
2417                    && (!c.declared_columns.is_empty()
2418                        || c.table_def
2419                            .as_ref()
2420                            .map(|t| !t.columns.is_empty())
2421                            .unwrap_or(false))
2422            }
2423            None => false,
2424        };
2425        if !needs_normalisation {
2426            return create_rows_batch_prevalidated_columnar_with_outputs(
2427                self,
2428                collection,
2429                column_names,
2430                rows,
2431            );
2432        }
2433
2434        // Slow path: contract requires per-row normalisation /
2435        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2436        // the columnar layout (this still pays N×ncols `String::clone`
2437        // — but it's deferred out of the wire decoder hot loop and
2438        // happens behind a runtime API, not the per-row decode loop)
2439        // and fall through to the existing `create_rows_batch` path.
2440        let ncols = column_names.len();
2441        let tuple_rows: Vec<CreateRowInput> = rows
2442            .into_iter()
2443            .map(|values| {
2444                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2445                    Vec::with_capacity(ncols);
2446                for (name, value) in column_names.iter().zip(values) {
2447                    fields.push((name.clone(), value));
2448                }
2449                CreateRowInput {
2450                    collection: collection.clone(),
2451                    fields,
2452                    metadata: Vec::new(),
2453                    node_links: Vec::new(),
2454                    vector_links: Vec::new(),
2455                }
2456            })
2457            .collect();
2458        self.create_rows_batch(CreateRowsBatchInput {
2459            collection,
2460            rows: tuple_rows,
2461            suppress_events: false,
2462        })
2463    }
2464
2465    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2466        if input.rows.is_empty() {
2467            return Ok(0);
2468        }
2469        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2470        self.check_batch_size(input.rows.len())?;
2471        self.check_db_size()?;
2472
2473        let db = self.db();
2474        let collection = input.collection;
2475        // Still verify the collection's declared model before we blast
2476        // rows at it — this one-off check is O(1), independent of
2477        // ncols, and catches schema-kind mismatches that the client
2478        // can't always see.
2479        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2480        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2481
2482        // Hoist the per-collection default TTL lookup out of the
2483        // per-row loop — it only depends on the collection, not on
2484        // individual rows, and the old path did one HashMap read per
2485        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2486        // apply to any row whose metadata doesn't already carry a TTL.
2487        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2488
2489        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2490            Vec::with_capacity(input.rows.len());
2491        let mut mutation_rows = mutation_rows;
2492        for row in input.rows {
2493            if row.collection != collection {
2494                return Err(crate::RedDBError::Query(format!(
2495                    "batch row collection mismatch: expected '{}', got '{}'",
2496                    collection, row.collection
2497                )));
2498            }
2499            let mut metadata = row.metadata;
2500            if let Some(ttl) = default_ttl_ms {
2501                if !has_internal_ttl_metadata(&metadata) {
2502                    metadata.push((
2503                        "_ttl_ms".to_string(),
2504                        if ttl <= i64::MAX as u64 {
2505                            MetadataValue::Int(ttl as i64)
2506                        } else {
2507                            MetadataValue::Timestamp(ttl)
2508                        },
2509                    ));
2510                }
2511            }
2512            mutation_rows.push(crate::runtime::mutation::MutationRow {
2513                fields: row.fields,
2514                metadata,
2515                node_links: row.node_links,
2516                vector_links: row.vector_links,
2517            });
2518        }
2519
2520        let engine = self.mutation_engine();
2521        let result = engine
2522            .apply(collection, mutation_rows)
2523            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2524        Ok(result.ids.len())
2525    }
2526
2527    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2528        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2529        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2530        self.create_node_unchecked(input)
2531    }
2532
2533    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2534        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2535        ensure_non_tree_structural_edge_label(&input.label)?;
2536        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2537        self.create_edge_unchecked(input)
2538    }
2539
2540    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2541        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2542        let db = self.db();
2543        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2544        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2545        ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2546        let mut metadata = input.metadata;
2547        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2548        let mut builder = db.vector(&input.collection).dense(input.dense);
2549
2550        if let Some(content) = input.content {
2551            builder = builder.content(content);
2552        }
2553
2554        for (key, value) in metadata {
2555            builder = builder.metadata(key, value);
2556        }
2557
2558        if let Some(link_row) = input.link_row {
2559            builder = builder.link_to_table(link_row);
2560        }
2561
2562        if let Some(link_node) = input.link_node {
2563            builder = builder.link_to_node(link_node);
2564        }
2565
2566        let id = builder.save()?;
2567        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2568        // concurrent ANN scans hide it until the transaction commits.
2569        self.stamp_xmin_if_in_txn(&input.collection, id);
2570        refresh_context_index(&db, &input.collection, id)?;
2571        self.cdc_emit(
2572            crate::replication::cdc::ChangeOperation::Insert,
2573            &input.collection,
2574            id.raw(),
2575            "vector",
2576        );
2577        Ok(CreateEntityOutput {
2578            id,
2579            entity: db.store().get(&input.collection, id),
2580        })
2581    }
2582
2583    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2584        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2585        let db = self.db();
2586        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2587        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2588
2589        // Serialize the full body as Value::Json for the "body" field
2590        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2591            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2592        })?;
2593        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2594            "body".to_string(),
2595            crate::storage::schema::Value::Json(body_bytes),
2596        )];
2597
2598        // Flatten top-level keys from the body into named fields for filtering
2599        if let JsonValue::Object(ref map) = input.body {
2600            for (key, value) in map {
2601                let storage_value = json_to_storage_value(value)?;
2602                fields.push((key.clone(), storage_value));
2603            }
2604        }
2605
2606        let mut metadata = input.metadata;
2607        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2608        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2609            .iter()
2610            .map(|(key, value)| (key.as_str(), value.clone()))
2611            .collect();
2612        let mut builder = db.row(&input.collection, columns);
2613
2614        for (key, value) in metadata {
2615            builder = builder.metadata(key, value);
2616        }
2617
2618        for node in input.node_links {
2619            builder = builder.link_to_node(node);
2620        }
2621
2622        for vector in input.vector_links {
2623            builder = builder.link_to_vector(vector);
2624        }
2625
2626        let id = builder.save()?;
2627        // Phase 1.1 MVCC universal: stamp xmin on the document.
2628        self.stamp_xmin_if_in_txn(&input.collection, id);
2629        refresh_context_index(&db, &input.collection, id)?;
2630        self.cdc_emit(
2631            crate::replication::cdc::ChangeOperation::Insert,
2632            &input.collection,
2633            id.raw(),
2634            "document",
2635        );
2636        Ok(CreateEntityOutput {
2637            id,
2638            entity: db.store().get(&input.collection, id),
2639        })
2640    }
2641
2642    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2643        let db = self.db();
2644        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2645        let declared_model = db
2646            .collection_contract(&input.collection)
2647            .map(|contract| contract.declared_model);
2648        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2649            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2650            self.seal_vault_value(&input.collection, input.value)?
2651        } else {
2652            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2653            input.value
2654        };
2655        let fields = vec![
2656            (
2657                "key".to_string(),
2658                crate::storage::schema::Value::text(input.key),
2659            ),
2660            ("value".to_string(), value),
2661        ];
2662        let collection = input.collection;
2663        let result = self.mutation_engine().apply(
2664            collection.clone(),
2665            vec![crate::runtime::mutation::MutationRow {
2666                fields,
2667                metadata: input.metadata,
2668                node_links: Vec::new(),
2669                vector_links: Vec::new(),
2670            }],
2671        )?;
2672        let id = result.ids[0];
2673        Ok(CreateEntityOutput {
2674            id,
2675            entity: db.store().get(&collection, id),
2676        })
2677    }
2678
2679    fn create_timeseries_point(
2680        &self,
2681        input: CreateTimeSeriesPointInput,
2682    ) -> RedDBResult<CreateEntityOutput> {
2683        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2684        let db = self.db();
2685        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2686        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2687
2688        let mut fields = vec![
2689            (
2690                "metric".to_string(),
2691                crate::storage::schema::Value::text(input.metric),
2692            ),
2693            (
2694                "value".to_string(),
2695                crate::storage::schema::Value::Float(input.value),
2696            ),
2697        ];
2698
2699        if let Some(timestamp_ns) = input.timestamp_ns {
2700            fields.push((
2701                "timestamp".to_string(),
2702                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2703            ));
2704        }
2705
2706        if !input.tags.is_empty() {
2707            let tags_json = JsonValue::Object(
2708                input
2709                    .tags
2710                    .into_iter()
2711                    .map(|(key, value)| (key, JsonValue::String(value)))
2712                    .collect(),
2713            );
2714            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2715                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2716            })?;
2717            fields.push((
2718                "tags".to_string(),
2719                crate::storage::schema::Value::Json(tags_bytes),
2720            ));
2721        }
2722
2723        let collection = input.collection;
2724        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2725        // Phase 1.1 MVCC universal: stamp xmin on the point so
2726        // concurrent range scans hide it until COMMIT.
2727        self.stamp_xmin_if_in_txn(&collection, id);
2728        refresh_context_index(&db, &collection, id)?;
2729
2730        Ok(CreateEntityOutput {
2731            id,
2732            entity: db.store().get(&collection, id),
2733        })
2734    }
2735
2736    fn get_kv(
2737        &self,
2738        collection: &str,
2739        key: &str,
2740    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2741        let db = self.db();
2742        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2743        let store = db.store();
2744        let Some(manager) = store.get_collection(collection) else {
2745            return Ok(None);
2746        };
2747        let entities = manager.query_all(|_| true);
2748        for entity in entities {
2749            if let crate::storage::EntityData::Row(ref row) = entity.data {
2750                if let Some(ref named) = row.named {
2751                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2752                        if &**k == key {
2753                            let value = named
2754                                .get("value")
2755                                .cloned()
2756                                .unwrap_or(crate::storage::schema::Value::Null);
2757                            return Ok(Some((value, entity.id)));
2758                        }
2759                    }
2760                }
2761            }
2762        }
2763        Ok(None)
2764    }
2765
2766    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2767        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2768        let found = self.get_kv(collection, key)?;
2769        if let Some((_, id)) = found {
2770            let db = self.db();
2771            let store = db.store();
2772            let deleted = store
2773                .delete(collection, id)
2774                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2775            if deleted {
2776                store.context_index().remove_entity(id);
2777            }
2778            Ok(deleted)
2779        } else {
2780            Ok(false)
2781        }
2782    }
2783
2784    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2785        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2786        let PatchEntityInput {
2787            collection,
2788            id,
2789            payload,
2790            operations,
2791        } = input;
2792        let db = self.db();
2793        let store = db.store();
2794        let Some(manager) = store.get_collection(&collection) else {
2795            return Err(crate::RedDBError::NotFound(format!(
2796                "collection not found: {collection}"
2797            )));
2798        };
2799        let Some(entity) = manager.get(id) else {
2800            return Err(crate::RedDBError::NotFound(format!(
2801                "entity not found: {}",
2802                id.raw()
2803            )));
2804        };
2805        self.apply_loaded_patch_entity(collection, entity, payload, operations)
2806    }
2807
2808    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2809        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2810        let store = self.db().store();
2811        // Snapshot row fields before delete so we can mirror the removal
2812        // into every secondary index. The fetch is best-effort: if the
2813        // entity is already gone, the delete below is a no-op anyway.
2814        let pre_delete_fields = store
2815            .get(&input.collection, input.id)
2816            .as_ref()
2817            .map(entity_row_fields_snapshot)
2818            .unwrap_or_default();
2819        // Store delete first (source of truth). Crash between here and index removal
2820        // leaves the entity invisible to most queries but recoverable; the reverse
2821        // (remove index first, then crash) leaves the entity permanently orphaned.
2822        let deleted = store
2823            .delete(&input.collection, input.id)
2824            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2825        if deleted {
2826            store.context_index().remove_entity(input.id);
2827            // Secondary index maintenance — surface only registry-shape
2828            // errors; missing-index removals are tolerated inside the call.
2829            if !pre_delete_fields.is_empty() {
2830                self.index_store_ref()
2831                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2832                    .map_err(crate::RedDBError::Internal)?;
2833            }
2834            self.cdc_emit(
2835                crate::replication::cdc::ChangeOperation::Delete,
2836                &input.collection,
2837                input.id.raw(),
2838                "entity",
2839            );
2840        }
2841        Ok(DeleteEntityOutput {
2842            deleted,
2843            id: input.id,
2844        })
2845    }
2846}