Skip to main content

reddb_server/application/
ports_impls_entity.rs

1use std::collections::HashMap;
2
3use crate::application::entity::{
4    AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5    RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8    has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21    db: &crate::storage::unified::devx::RedDB,
22    collection: &str,
23    metadata: &mut Vec<(String, MetadataValue)>,
24) {
25    if has_internal_ttl_metadata(metadata) {
26        return;
27    }
28
29    let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30        return;
31    };
32
33    metadata.push((
34        "_ttl_ms".to_string(),
35        if default_ttl_ms <= i64::MAX as u64 {
36            MetadataValue::Int(default_ttl_ms as i64)
37        } else {
38            MetadataValue::Timestamp(default_ttl_ms)
39        },
40    ));
41}
42
43fn refresh_context_index(
44    db: &crate::storage::unified::devx::RedDB,
45    collection: &str,
46    id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48    let store = db.store();
49    let Some(entity) = store.get(collection, id) else {
50        return Ok(());
51    };
52
53    store.context_index().index_entity(collection, &entity);
54    Ok(())
55}
56
57/// Pull `(name, value)` pairs for every named column on a row entity.
58/// Returns empty if the entity is not a row, or if the row carries
59/// neither a `named` map nor a `schema` Arc — both of those mean the
60/// names aren't recoverable here, so secondary-index maintenance has
61/// nothing to act on. Used by the delete + update paths.
62pub(crate) fn entity_row_fields_snapshot(
63    entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65    let crate::storage::EntityData::Row(row) = &entity.data else {
66        return Vec::new();
67    };
68    if let Some(named) = &row.named {
69        return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70    }
71    if let Some(schema) = &row.schema {
72        return schema
73            .iter()
74            .zip(row.columns.iter())
75            .map(|(name, value)| (name.clone(), value.clone()))
76            .collect();
77    }
78    Vec::new()
79}
80
81fn ensure_collection_model_contract(
82    db: &crate::storage::unified::devx::RedDB,
83    collection: &str,
84    requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86    if let Some(contract) = db.collection_contract(collection) {
87        if !contract_enforces_model(&contract) {
88            return Ok(());
89        }
90        if collection_model_allows(contract.declared_model, requested_model) {
91            return Ok(());
92        }
93        return Err(crate::RedDBError::InvalidOperation(format!(
94            "collection '{}' is declared as '{}' and does not allow '{}' writes",
95            collection,
96            collection_model_name(contract.declared_model),
97            collection_model_name(requested_model)
98        )));
99    }
100
101    let now = implicit_contract_unix_ms();
102    db.save_collection_contract(crate::physical::CollectionContract {
103        name: collection.to_string(),
104        declared_model: requested_model,
105        schema_mode: crate::catalog::SchemaMode::Dynamic,
106        origin: crate::physical::ContractOrigin::Implicit,
107        version: 1,
108        created_at_unix_ms: now,
109        updated_at_unix_ms: now,
110        default_ttl_ms: db.collection_default_ttl_ms(collection),
111        context_index_fields: Vec::new(),
112        declared_columns: Vec::new(),
113        table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
114            .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
115        timestamps_enabled: false,
116        context_index_enabled: false,
117        // Implicit contracts are created on first write — mutability
118        // is the default until the operator runs explicit DDL.
119        append_only: false,
120        subscriptions: Vec::new(),
121    })
122    .map(|_| ())
123    .map_err(|err| crate::RedDBError::Internal(err.to_string()))
124}
125
126fn implicit_contract_unix_ms() -> u128 {
127    std::time::SystemTime::now()
128        .duration_since(std::time::UNIX_EPOCH)
129        .unwrap_or_default()
130        .as_millis()
131}
132
133fn collection_model_allows(
134    declared_model: crate::catalog::CollectionModel,
135    requested_model: crate::catalog::CollectionModel,
136) -> bool {
137    declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
138}
139
140fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
141    match model {
142        crate::catalog::CollectionModel::Table => "table",
143        crate::catalog::CollectionModel::Document => "document",
144        crate::catalog::CollectionModel::Graph => "graph",
145        crate::catalog::CollectionModel::Vector => "vector",
146        crate::catalog::CollectionModel::Kv => "kv",
147        crate::catalog::CollectionModel::Config => "config",
148        crate::catalog::CollectionModel::Vault => "vault",
149        crate::catalog::CollectionModel::Mixed => "mixed",
150        crate::catalog::CollectionModel::TimeSeries => "timeseries",
151        crate::catalog::CollectionModel::Queue => "queue",
152    }
153}
154
155#[derive(Clone)]
156struct UniquenessRule {
157    name: String,
158    columns: Vec<String>,
159    primary_key: bool,
160}
161
162#[derive(Copy, Clone, PartialEq, Eq)]
163enum NormalizeMode {
164    /// First write for this row. Timestamps auto-filled from now on
165    /// both `created_at` and `updated_at`; user attempts to set
166    /// either column are rejected.
167    Insert,
168    /// Update/patch path. `created_at` is preserved from the existing
169    /// row (immutable after insert); `updated_at` is bumped to now.
170    /// User attempts to set either via the patch are rejected.
171    Update,
172}
173
174mod collection_contract_enforcement {
175    use super::*;
176
177    pub(super) struct CollectionContractWriteEnforcer<'a> {
178        db: &'a crate::storage::unified::devx::RedDB,
179        collection: &'a str,
180    }
181
182    impl<'a> CollectionContractWriteEnforcer<'a> {
183        pub(super) fn new(
184            db: &'a crate::storage::unified::devx::RedDB,
185            collection: &'a str,
186        ) -> Self {
187            Self { db, collection }
188        }
189
190        pub(super) fn ensure_model(
191            &self,
192            requested_model: crate::catalog::CollectionModel,
193        ) -> RedDBResult<()> {
194            ensure_collection_model_contract(self.db, self.collection, requested_model)
195        }
196
197        pub(super) fn normalize_insert_fields(
198            &self,
199            fields: Vec<(String, Value)>,
200        ) -> RedDBResult<Vec<(String, Value)>> {
201            normalize_row_fields_for_contract_with_mode(
202                self.db,
203                self.collection,
204                fields,
205                NormalizeMode::Insert,
206            )
207        }
208
209        pub(super) fn normalize_update_fields(
210            &self,
211            fields: Vec<(String, Value)>,
212        ) -> RedDBResult<Vec<(String, Value)>> {
213            normalize_row_fields_for_contract_with_mode(
214                self.db,
215                self.collection,
216                fields,
217                NormalizeMode::Update,
218            )
219        }
220
221        pub(super) fn enforce_row_uniqueness(
222            &self,
223            fields: &[(String, Value)],
224            exclude_id: Option<crate::storage::EntityId>,
225        ) -> RedDBResult<()> {
226            enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
227        }
228
229        pub(super) fn enforce_batch_uniqueness(
230            &self,
231            rows: &[Vec<(String, Value)>],
232        ) -> RedDBResult<()> {
233            enforce_row_batch_uniqueness(self.db, self.collection, rows)
234        }
235
236        pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
237            row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
238        }
239    }
240}
241
242use collection_contract_enforcement::CollectionContractWriteEnforcer;
243
244fn normalize_row_fields_for_contract_with_mode(
245    db: &crate::storage::unified::devx::RedDB,
246    collection: &str,
247    fields: Vec<(String, Value)>,
248    mode: NormalizeMode,
249) -> RedDBResult<Vec<(String, Value)>> {
250    let Some(contract) = db.collection_contract(collection) else {
251        return Ok(fields);
252    };
253
254    if contract.declared_model != crate::catalog::CollectionModel::Table
255        || (contract.declared_columns.is_empty()
256            && contract
257                .table_def
258                .as_ref()
259                .map(|table| table.columns.is_empty())
260                .unwrap_or(true))
261    {
262        return Ok(fields);
263    }
264
265    // Capture the pre-normalize value of created_at (if present) so
266    // Update mode can preserve it. Also capture updated_at to detect
267    // user attempts to set it via the patch payload.
268    //
269    // Heuristic for Update mode: if fields ALREADY contains a
270    // `created_at` whose value matches the row's on-disk entity, the
271    // caller is the patch pipeline carrying forward an auto-populated
272    // column — not a user mutation. Allow pass-through in that case,
273    // then restore the original value at the end.
274    let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
275        fields
276            .iter()
277            .find(|(n, _)| n == "created_at")
278            .map(|(_, v)| v.clone())
279    } else {
280        None
281    };
282
283    // Reject user attempts to set runtime-managed timestamp columns.
284    // On Insert we reject any mention; on Update we only reject when
285    // the patch pipeline handed us a NEW value (not the one we
286    // auto-populated during the last insert).
287    if contract.timestamps_enabled && mode == NormalizeMode::Insert {
288        for (name, _) in &fields {
289            if name == "created_at" || name == "updated_at" {
290                return Err(crate::RedDBError::Query(format!(
291                    "collection '{}' manages '{}' automatically — do not set it in INSERT",
292                    collection, name
293                )));
294            }
295        }
296    }
297
298    let mut provided = std::collections::BTreeMap::new();
299    for (name, value) in &fields {
300        provided.insert(name.clone(), value.clone());
301    }
302
303    let resolved_columns = resolved_contract_columns(&contract)?;
304    let declared_names: std::collections::BTreeSet<String> = resolved_columns
305        .iter()
306        .map(|column| column.name.clone())
307        .collect();
308    let unknown_fields: Vec<String> = fields
309        .iter()
310        .filter(|(name, _)| !declared_names.contains(name))
311        .map(|(name, _)| name.clone())
312        .collect();
313    if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
314        && !unknown_fields.is_empty()
315    {
316        return Err(crate::RedDBError::Query(format!(
317            "collection '{}' is strict and does not allow undeclared fields: {}",
318            collection,
319            unknown_fields.join(", ")
320        )));
321    }
322    let mut normalized = Vec::new();
323    let now_ms = current_unix_ms_u64();
324
325    for column in &resolved_columns {
326        match provided.remove(&column.name) {
327            Some(value) => {
328                // Runtime-managed columns on Update: always overwrite
329                // with the runtime's own value (preserved created_at
330                // or fresh updated_at). User mutations are silently
331                // discarded because we reject them earlier.
332                if contract.timestamps_enabled && mode == NormalizeMode::Update {
333                    match column.name.as_str() {
334                        "created_at" => {
335                            normalized.push((
336                                column.name.clone(),
337                                existing_created_at
338                                    .clone()
339                                    .unwrap_or(Value::UnsignedInteger(now_ms)),
340                            ));
341                            continue;
342                        }
343                        "updated_at" => {
344                            normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
345                            continue;
346                        }
347                        _ => {}
348                    }
349                }
350                normalized.push((
351                    column.name.clone(),
352                    normalize_contract_value(collection, column, value)?,
353                ));
354            }
355            None => {
356                // Runtime-managed timestamp columns: auto-fill with now
357                // when the contract opted in. Both get the same value on
358                // first insert so callers can order by either.
359                if contract.timestamps_enabled
360                    && (column.name == "created_at" || column.name == "updated_at")
361                {
362                    normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
363                    continue;
364                }
365                if let Some(default) = &column.default {
366                    normalized.push((
367                        column.name.clone(),
368                        coerce_contract_literal(collection, &column.name, column, default)?,
369                    ));
370                } else if column.not_null {
371                    return Err(crate::RedDBError::Query(format!(
372                        "missing required column '{}' for collection '{}'",
373                        column.name, collection
374                    )));
375                }
376            }
377        }
378    }
379
380    for (name, value) in fields {
381        if !declared_names.contains(&name) {
382            normalized.push((name, value));
383        }
384    }
385
386    Ok(normalized)
387}
388
389fn current_unix_ms_u64() -> u64 {
390    std::time::SystemTime::now()
391        .duration_since(std::time::UNIX_EPOCH)
392        .map(|d| d.as_millis() as u64)
393        .unwrap_or(0)
394}
395
396fn enforce_row_uniqueness(
397    db: &crate::storage::unified::devx::RedDB,
398    collection: &str,
399    fields: &[(String, Value)],
400    exclude_id: Option<crate::storage::EntityId>,
401) -> RedDBResult<()> {
402    let Some(contract) = db.collection_contract(collection) else {
403        return Ok(());
404    };
405    if contract.declared_model != crate::catalog::CollectionModel::Table
406        && contract.declared_model != crate::catalog::CollectionModel::Mixed
407    {
408        return Ok(());
409    }
410
411    let rules = resolved_uniqueness_rules(&contract);
412    if rules.is_empty() {
413        return Ok(());
414    }
415
416    let store = db.store();
417    let Some(manager) = store.get_collection(collection) else {
418        return Ok(());
419    };
420
421    let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
422
423    for rule in &rules {
424        let mut expected_signatures = Vec::new();
425        let mut skip_rule = false;
426
427        for column in &rule.columns {
428            match input_fields.get(column) {
429                Some(Value::Null) | None if rule.primary_key => {
430                    return Err(crate::RedDBError::Query(format!(
431                        "primary key '{}' in collection '{}' requires non-null column '{}'",
432                        rule.name, collection, column
433                    )))
434                }
435                Some(Value::Null) | None => {
436                    skip_rule = true;
437                    break;
438                }
439                Some(value) => {
440                    expected_signatures.push((column.clone(), value_signature(value)));
441                }
442            }
443        }
444
445        if skip_rule {
446            continue;
447        }
448
449        for entity in manager.query_all(|_| true) {
450            if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
451                continue;
452            }
453            let Some(existing_fields) = row_fields_from_entity(&entity) else {
454                continue;
455            };
456
457            let duplicate = expected_signatures.iter().all(|(column, expected)| {
458                existing_fields
459                    .get(column)
460                    .map(|value| value_signature(value) == *expected)
461                    .unwrap_or(false)
462            });
463
464            if duplicate {
465                let qualifier = if rule.primary_key {
466                    "primary key"
467                } else {
468                    "unique constraint"
469                };
470                return Err(crate::RedDBError::Query(format!(
471                    "{} '{}' violated on collection '{}' for columns [{}]",
472                    qualifier,
473                    rule.name,
474                    collection,
475                    rule.columns.join(", ")
476                )));
477            }
478        }
479    }
480
481    Ok(())
482}
483
484fn enforce_row_batch_uniqueness(
485    db: &crate::storage::unified::devx::RedDB,
486    collection: &str,
487    rows: &[Vec<(String, Value)>],
488) -> RedDBResult<()> {
489    let Some(contract) = db.collection_contract(collection) else {
490        return Ok(());
491    };
492    if contract.declared_model != crate::catalog::CollectionModel::Table
493        && contract.declared_model != crate::catalog::CollectionModel::Mixed
494    {
495        return Ok(());
496    }
497
498    let rules = resolved_uniqueness_rules(&contract);
499    if rules.is_empty() {
500        return Ok(());
501    }
502
503    for rule in &rules {
504        let mut seen = std::collections::HashMap::<String, usize>::new();
505        for (row_index, fields) in rows.iter().enumerate() {
506            let input_fields: std::collections::BTreeMap<String, Value> =
507                fields.iter().cloned().collect();
508            let mut signatures = Vec::new();
509            let mut skip_rule = false;
510
511            for column in &rule.columns {
512                match input_fields.get(column) {
513                    Some(Value::Null) | None if rule.primary_key => {
514                        return Err(crate::RedDBError::Query(format!(
515                            "primary key '{}' in collection '{}' requires non-null column '{}'",
516                            rule.name, collection, column
517                        )))
518                    }
519                    Some(Value::Null) | None => {
520                        skip_rule = true;
521                        break;
522                    }
523                    Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
524                }
525            }
526
527            if skip_rule {
528                continue;
529            }
530
531            let signature = signatures.join("|");
532            if let Some(previous_index) = seen.insert(signature, row_index) {
533                return Err(crate::RedDBError::Query(format!(
534                    "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
535                    rule.name,
536                    collection,
537                    previous_index + 1,
538                    row_index + 1
539                )));
540            }
541        }
542    }
543
544    Ok(())
545}
546
547fn row_update_requires_uniqueness_check(
548    db: &crate::storage::unified::devx::RedDB,
549    collection: &str,
550    modified_columns: &[String],
551) -> bool {
552    if modified_columns.is_empty() {
553        return false;
554    }
555
556    let Some(contract) = db.collection_contract(collection) else {
557        return false;
558    };
559    if contract.declared_model != crate::catalog::CollectionModel::Table
560        && contract.declared_model != crate::catalog::CollectionModel::Mixed
561    {
562        return false;
563    }
564
565    let rules = resolved_uniqueness_rules(&contract);
566    if rules.is_empty() {
567        return false;
568    }
569
570    rules.iter().any(|rule| {
571        rule.columns.iter().any(|column| {
572            modified_columns
573                .iter()
574                .any(|modified| modified.eq_ignore_ascii_case(column))
575        })
576    })
577}
578
579pub(crate) fn build_row_update_contract_plan(
580    db: &crate::storage::unified::devx::RedDB,
581    collection: &str,
582) -> RedDBResult<Option<RowUpdateContractPlan>> {
583    let Some(contract) = db.collection_contract(collection) else {
584        return Ok(None);
585    };
586
587    let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
588        && !(contract.declared_columns.is_empty()
589            && contract
590                .table_def
591                .as_ref()
592                .map(|table| table.columns.is_empty())
593                .unwrap_or(true))
594    {
595        resolved_contract_columns(&contract)?
596            .into_iter()
597            .map(|rule| {
598                (
599                    rule.name.clone(),
600                    RowUpdateColumnRule {
601                        name: rule.name,
602                        data_type: rule.data_type,
603                        data_type_name: rule.data_type_name,
604                        not_null: rule.not_null,
605                        enum_variants: rule.enum_variants,
606                    },
607                )
608            })
609            .collect()
610    } else {
611        HashMap::new()
612    };
613
614    let unique_columns = if matches!(
615        contract.declared_model,
616        crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
617    ) {
618        resolved_uniqueness_rules(&contract)
619            .into_iter()
620            .flat_map(|rule| rule.columns.into_iter())
621            .map(|column| (column, ()))
622            .collect()
623    } else {
624        HashMap::new()
625    };
626
627    Ok(Some(RowUpdateContractPlan {
628        timestamps_enabled: contract.timestamps_enabled,
629        strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
630        declared_rules,
631        unique_columns,
632    }))
633}
634
635pub(crate) fn normalize_row_update_assignment_with_plan(
636    collection: &str,
637    column: &str,
638    value: Value,
639    row_contract_plan: Option<&RowUpdateContractPlan>,
640) -> RedDBResult<Value> {
641    let Some(plan) = row_contract_plan else {
642        return Ok(value);
643    };
644
645    if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
646        return Err(crate::RedDBError::Query(format!(
647            "collection '{}' manages '{}' automatically — do not set it in UPDATE",
648            collection, column
649        )));
650    }
651
652    if let Some(rule) = plan.declared_rules.get(column) {
653        let rule = ResolvedColumnRule {
654            name: rule.name.clone(),
655            data_type: rule.data_type,
656            data_type_name: rule.data_type_name.clone(),
657            not_null: rule.not_null,
658            default: None,
659            enum_variants: rule.enum_variants.clone(),
660        };
661        normalize_contract_value(collection, &rule, value)
662    } else if plan.strict_schema {
663        Err(crate::RedDBError::Query(format!(
664            "collection '{}' is strict and does not allow undeclared fields: {}",
665            collection, column
666        )))
667    } else {
668        Ok(value)
669    }
670}
671
672pub(crate) fn normalize_row_update_value_for_rule(
673    collection: &str,
674    value: Value,
675    row_rule: Option<&RowUpdateColumnRule>,
676) -> RedDBResult<Value> {
677    let Some(rule) = row_rule else {
678        return Ok(value);
679    };
680
681    let rule = ResolvedColumnRule {
682        name: rule.name.clone(),
683        data_type: rule.data_type,
684        data_type_name: rule.data_type_name.clone(),
685        not_null: rule.not_null,
686        default: None,
687        enum_variants: rule.enum_variants.clone(),
688    };
689    normalize_contract_value(collection, &rule, value)
690}
691
692fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
693    if let Some(named) = row.named.as_mut() {
694        named.insert(name.to_string(), value);
695        return;
696    }
697
698    if let Some(schema) = row.schema.as_ref() {
699        if let Some(index) = schema.iter().position(|column| column == name) {
700            if let Some(slot) = row.columns.get_mut(index) {
701                *slot = value;
702                return;
703            }
704        }
705
706        let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
707        for (column, current) in schema.iter().zip(row.columns.iter()) {
708            named.insert(column.clone(), current.clone());
709        }
710        named.insert(name.to_string(), value);
711        row.named = Some(named);
712        return;
713    }
714
715    let mut named = HashMap::with_capacity(1);
716    named.insert(name.to_string(), value);
717    row.named = Some(named);
718}
719
720fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
721    if let Some(named) = row.named.as_ref() {
722        named
723            .iter()
724            .map(|(key, value)| (key.clone(), value.clone()))
725            .collect()
726    } else if let Some(schema) = row.schema.as_ref() {
727        schema
728            .iter()
729            .cloned()
730            .zip(row.columns.iter().cloned())
731            .collect()
732    } else {
733        Vec::new()
734    }
735}
736
737fn apply_row_field_assignments_raw<I>(
738    row: &mut crate::storage::unified::entity::RowData,
739    field_assignments: I,
740) where
741    I: IntoIterator<Item = (String, Value)>,
742{
743    for (column, value) in field_assignments {
744        set_row_field(row, &column, value);
745    }
746}
747
748fn apply_row_field_assignments_incremental<I>(
749    collection: &str,
750    row: &mut crate::storage::unified::entity::RowData,
751    field_assignments: I,
752    row_contract_plan: Option<&RowUpdateContractPlan>,
753) -> RedDBResult<()>
754where
755    I: IntoIterator<Item = (String, Value)>,
756{
757    for (column, value) in field_assignments {
758        let value = normalize_row_update_assignment_with_plan(
759            collection,
760            &column,
761            value,
762            row_contract_plan,
763        )?;
764
765        set_row_field(row, &column, value);
766    }
767
768    Ok(())
769}
770
771fn resolved_uniqueness_rules(
772    contract: &crate::physical::CollectionContract,
773) -> Vec<UniquenessRule> {
774    let mut rules = Vec::new();
775
776    if let Some(table_def) = &contract.table_def {
777        if !table_def.primary_key.is_empty() {
778            rules.push(UniquenessRule {
779                name: "primary_key".to_string(),
780                columns: table_def.primary_key.clone(),
781                primary_key: true,
782            });
783        }
784
785        for constraint in &table_def.constraints {
786            if matches!(
787                constraint.constraint_type,
788                crate::storage::schema::ConstraintType::PrimaryKey
789            ) && !constraint.columns.is_empty()
790            {
791                rules.push(UniquenessRule {
792                    name: constraint.name.clone(),
793                    columns: constraint.columns.clone(),
794                    primary_key: true,
795                });
796            } else if matches!(
797                constraint.constraint_type,
798                crate::storage::schema::ConstraintType::Unique
799            ) && !constraint.columns.is_empty()
800            {
801                rules.push(UniquenessRule {
802                    name: constraint.name.clone(),
803                    columns: constraint.columns.clone(),
804                    primary_key: false,
805                });
806            }
807        }
808    } else {
809        for column in &contract.declared_columns {
810            if column.primary_key {
811                rules.push(UniquenessRule {
812                    name: format!("pk_{}", column.name),
813                    columns: vec![column.name.clone()],
814                    primary_key: true,
815                });
816            } else if column.unique {
817                rules.push(UniquenessRule {
818                    name: format!("uniq_{}", column.name),
819                    columns: vec![column.name.clone()],
820                    primary_key: false,
821                });
822            }
823        }
824    }
825
826    let mut dedup = std::collections::BTreeSet::new();
827    rules
828        .into_iter()
829        .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
830        .collect()
831}
832
833fn row_fields_from_entity(
834    entity: &crate::storage::UnifiedEntity,
835) -> Option<std::collections::BTreeMap<String, Value>> {
836    match &entity.data {
837        crate::storage::EntityData::Row(row) => {
838            if let Some(named) = &row.named {
839                Some(
840                    named
841                        .iter()
842                        .map(|(key, value)| (key.clone(), value.clone()))
843                        .collect(),
844                )
845            } else {
846                row.schema.as_ref().map(|schema| {
847                    schema
848                        .iter()
849                        .cloned()
850                        .zip(row.columns.iter().cloned())
851                        .collect()
852                })
853            }
854        }
855        _ => None,
856    }
857}
858
859fn value_signature(value: &Value) -> String {
860    format!("{value:?}")
861}
862
863fn normalize_contract_value(
864    collection: &str,
865    column: &ResolvedColumnRule,
866    value: Value,
867) -> RedDBResult<Value> {
868    if matches!(value, Value::Null) {
869        if column.not_null {
870            return Err(crate::RedDBError::Query(format!(
871                "column '{}' in collection '{}' cannot be null",
872                column.name, collection
873            )));
874        }
875        return Ok(Value::Null);
876    }
877
878    let target = column.data_type;
879    if value_matches_declared_type(&value, target) {
880        return Ok(value);
881    }
882
883    let Some(raw) = value_to_coercion_input(&value) else {
884        return Err(crate::RedDBError::Query(format!(
885            "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
886            column.name, collection, column.data_type_name
887        )));
888    };
889
890    coerce_contract_literal(collection, &column.name, column, &raw)
891}
892
893fn coerce_contract_literal(
894    collection: &str,
895    column_name: &str,
896    column: &ResolvedColumnRule,
897    raw: &str,
898) -> RedDBResult<Value> {
899    let target = column.data_type;
900    match target {
901        DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
902        DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
903        DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
904            crate::RedDBError::Query(format!(
905                "failed to coerce column '{}' in collection '{}' to '{}': {}",
906                column_name, collection, column.data_type_name, err
907            ))
908        }),
909        DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
910            crate::RedDBError::Query(format!(
911                "failed to coerce column '{}' in collection '{}' to '{}': {}",
912                column_name, collection, column.data_type_name, err
913            ))
914        }),
915        DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
916            "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
917            column_name, collection, column.data_type_name
918        ))),
919        _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
920            |err| {
921                crate::RedDBError::Query(format!(
922                    "failed to coerce column '{}' in collection '{}' to '{}': {}",
923                    column_name, collection, column.data_type_name, err
924                ))
925            },
926        ),
927    }
928}
929
930struct ResolvedColumnRule {
931    name: String,
932    data_type: DataType,
933    data_type_name: String,
934    not_null: bool,
935    default: Option<String>,
936    enum_variants: Vec<String>,
937}
938
939fn resolved_contract_columns(
940    contract: &crate::physical::CollectionContract,
941) -> RedDBResult<Vec<ResolvedColumnRule>> {
942    if let Some(table_def) = &contract.table_def {
943        return Ok(table_def
944            .columns
945            .iter()
946            .map(|column| ResolvedColumnRule {
947                name: column.name.clone(),
948                data_type: column.data_type,
949                data_type_name: data_type_name(column.data_type).to_string(),
950                not_null: !column.nullable,
951                default: column
952                    .default
953                    .as_ref()
954                    .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
955                enum_variants: column.enum_variants.clone(),
956            })
957            .collect());
958    }
959
960    contract
961        .declared_columns
962        .iter()
963        .map(|column| {
964            let data_type = column
965                .sql_type
966                .as_ref()
967                .map(crate::storage::query::resolve_sql_type_name)
968                .transpose()
969                .map_err(|err| crate::RedDBError::Query(err.to_string()))?
970                .unwrap_or(parse_declared_data_type(&column.data_type)?);
971            Ok(ResolvedColumnRule {
972                name: column.name.clone(),
973                data_type,
974                data_type_name: column.data_type.clone(),
975                not_null: column.not_null,
976                default: column.default.clone(),
977                enum_variants: column.enum_variants.clone(),
978            })
979        })
980        .collect()
981}
982
983fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
984    resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
985}
986
987fn data_type_name(data_type: DataType) -> &'static str {
988    match data_type {
989        DataType::Integer => "integer",
990        DataType::UnsignedInteger => "unsigned_integer",
991        DataType::Float => "float",
992        DataType::Text => "text",
993        DataType::Blob => "blob",
994        DataType::Boolean => "boolean",
995        DataType::Timestamp => "timestamp",
996        DataType::Duration => "duration",
997        DataType::IpAddr => "ipaddr",
998        DataType::MacAddr => "macaddr",
999        DataType::Vector => "vector",
1000        DataType::Nullable => "nullable",
1001        DataType::Unknown => "unknown",
1002        DataType::Json => "json",
1003        DataType::Uuid => "uuid",
1004        DataType::NodeRef => "noderef",
1005        DataType::EdgeRef => "edgeref",
1006        DataType::VectorRef => "vectorref",
1007        DataType::RowRef => "rowref",
1008        DataType::Color => "color",
1009        DataType::Email => "email",
1010        DataType::Url => "url",
1011        DataType::Phone => "phone",
1012        DataType::Semver => "semver",
1013        DataType::Cidr => "cidr",
1014        DataType::Date => "date",
1015        DataType::Time => "time",
1016        DataType::Decimal => "decimal",
1017        DataType::Enum => "enum",
1018        DataType::Array => "array",
1019        DataType::TimestampMs => "timestamp_ms",
1020        DataType::Ipv4 => "ipv4",
1021        DataType::Ipv6 => "ipv6",
1022        DataType::Subnet => "subnet",
1023        DataType::Port => "port",
1024        DataType::Latitude => "latitude",
1025        DataType::Longitude => "longitude",
1026        DataType::GeoPoint => "geopoint",
1027        DataType::Country2 => "country2",
1028        DataType::Country3 => "country3",
1029        DataType::Lang2 => "lang2",
1030        DataType::Lang5 => "lang5",
1031        DataType::Currency => "currency",
1032        DataType::AssetCode => "asset_code",
1033        DataType::Money => "money",
1034        DataType::ColorAlpha => "color_alpha",
1035        DataType::BigInt => "bigint",
1036        DataType::KeyRef => "keyref",
1037        DataType::DocRef => "docref",
1038        DataType::TableRef => "tableref",
1039        DataType::PageRef => "pageref",
1040        DataType::Secret => "secret",
1041        DataType::Password => "password",
1042        DataType::TextZstd => "text",
1043        DataType::BlobZstd => "blob",
1044    }
1045}
1046
1047fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1048    matches!(
1049        (value, target),
1050        (Value::Null, _)
1051            | (Value::Integer(_), DataType::Integer)
1052            | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1053            | (Value::Float(_), DataType::Float)
1054            | (Value::Text(_), DataType::Text)
1055            | (Value::Blob(_), DataType::Blob)
1056            | (Value::Boolean(_), DataType::Boolean)
1057            | (Value::Timestamp(_), DataType::Timestamp)
1058            | (Value::Duration(_), DataType::Duration)
1059            | (Value::IpAddr(_), DataType::IpAddr)
1060            | (Value::MacAddr(_), DataType::MacAddr)
1061            | (Value::Vector(_), DataType::Vector)
1062            | (Value::Json(_), DataType::Json)
1063            | (Value::Uuid(_), DataType::Uuid)
1064            | (Value::NodeRef(_), DataType::NodeRef)
1065            | (Value::EdgeRef(_), DataType::EdgeRef)
1066            | (Value::VectorRef(_, _), DataType::VectorRef)
1067            | (Value::RowRef(_, _), DataType::RowRef)
1068            | (Value::Color(_), DataType::Color)
1069            | (Value::Email(_), DataType::Email)
1070            | (Value::Url(_), DataType::Url)
1071            | (Value::Phone(_), DataType::Phone)
1072            | (Value::Semver(_), DataType::Semver)
1073            | (Value::Cidr(_, _), DataType::Cidr)
1074            | (Value::Date(_), DataType::Date)
1075            | (Value::Time(_), DataType::Time)
1076            | (Value::Decimal(_), DataType::Decimal)
1077            | (Value::EnumValue(_), DataType::Enum)
1078            | (Value::Array(_), DataType::Array)
1079            | (Value::TimestampMs(_), DataType::TimestampMs)
1080            | (Value::Ipv4(_), DataType::Ipv4)
1081            | (Value::Ipv6(_), DataType::Ipv6)
1082            | (Value::Subnet(_, _), DataType::Subnet)
1083            | (Value::Port(_), DataType::Port)
1084            | (Value::Latitude(_), DataType::Latitude)
1085            | (Value::Longitude(_), DataType::Longitude)
1086            | (Value::GeoPoint(_, _), DataType::GeoPoint)
1087            | (Value::Country2(_), DataType::Country2)
1088            | (Value::Country3(_), DataType::Country3)
1089            | (Value::Lang2(_), DataType::Lang2)
1090            | (Value::Lang5(_), DataType::Lang5)
1091            | (Value::Currency(_), DataType::Currency)
1092            | (Value::ColorAlpha(_), DataType::ColorAlpha)
1093            | (Value::BigInt(_), DataType::BigInt)
1094            | (Value::KeyRef(_, _), DataType::KeyRef)
1095            | (Value::DocRef(_, _), DataType::DocRef)
1096            | (Value::TableRef(_), DataType::TableRef)
1097            | (Value::PageRef(_), DataType::PageRef)
1098            | (Value::Secret(_), DataType::Secret)
1099            | (Value::Password(_), DataType::Password)
1100    )
1101}
1102
1103fn value_to_coercion_input(value: &Value) -> Option<String> {
1104    match value {
1105        Value::Null => None,
1106        Value::Integer(value) => Some(value.to_string()),
1107        Value::UnsignedInteger(value) => Some(value.to_string()),
1108        Value::Float(value) => Some(value.to_string()),
1109        Value::Text(value) => Some(value.to_string()),
1110        Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1111        Value::Boolean(value) => Some(value.to_string()),
1112        Value::Timestamp(value) => Some(value.to_string()),
1113        Value::Duration(value) => Some(value.to_string()),
1114        Value::IpAddr(value) => Some(value.to_string()),
1115        Value::MacAddr(value) => Some(format!(
1116            "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1117            value[0], value[1], value[2], value[3], value[4], value[5]
1118        )),
1119        Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1120        Value::Email(value) => Some(value.clone()),
1121        Value::Url(value) => Some(value.clone()),
1122        Value::Phone(value) => Some(value.to_string()),
1123        Value::Semver(value) => Some(format!(
1124            "{}.{}.{}",
1125            value / 1_000_000,
1126            (value / 1_000) % 1_000,
1127            value % 1_000
1128        )),
1129        Value::Date(value) => Some(value.to_string()),
1130        Value::Time(value) => Some(value.to_string()),
1131        Value::Decimal(value) => Some(value.to_string()),
1132        Value::TimestampMs(value) => Some(value.to_string()),
1133        Value::Ipv4(value) => Some(format!(
1134            "{}.{}.{}.{}",
1135            (value >> 24) & 0xFF,
1136            (value >> 16) & 0xFF,
1137            (value >> 8) & 0xFF,
1138            value & 0xFF
1139        )),
1140        Value::Port(value) => Some(value.to_string()),
1141        Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1142        Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1143        Value::GeoPoint(lat, lon) => Some(format!(
1144            "{},{}",
1145            *lat as f64 / 1_000_000.0,
1146            *lon as f64 / 1_000_000.0
1147        )),
1148        Value::BigInt(value) => Some(value.to_string()),
1149        Value::TableRef(value) => Some(value.clone()),
1150        Value::PageRef(value) => Some(value.to_string()),
1151        Value::Password(value) => Some(value.clone()),
1152        _ => None,
1153    }
1154}
1155
1156fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1157    if modified_columns.is_empty() {
1158        return modified_columns;
1159    }
1160
1161    let mut unique = Vec::with_capacity(modified_columns.len());
1162    for column in modified_columns.drain(..) {
1163        if !unique
1164            .iter()
1165            .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1166        {
1167            unique.push(column);
1168        }
1169    }
1170    unique
1171}
1172
1173impl RedDBRuntime {
1174    pub(crate) fn apply_loaded_patch_entity_core(
1175        &self,
1176        collection: String,
1177        mut entity: crate::storage::UnifiedEntity,
1178        payload: JsonValue,
1179        operations: Vec<PatchEntityOperation>,
1180    ) -> RedDBResult<AppliedEntityMutation> {
1181        let id = entity.id;
1182        let operations = normalize_ttl_patch_operations(operations)?;
1183        // Snapshot pre-patch row fields for the secondary-index hook —
1184        // empty for non-row entities, which is the desired no-op.
1185        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1186
1187        let db = self.db();
1188        let store = db.store();
1189        let Some(manager) = store.get_collection(&collection) else {
1190            return Err(crate::RedDBError::NotFound(format!(
1191                "collection not found: {collection}"
1192            )));
1193        };
1194
1195        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1196        let mut metadata_changed = false;
1197        let mut modified_columns: Vec<String> = Vec::new();
1198        let mut context_index_dirty = false;
1199
1200        let row_contract_timestamps = db
1201            .collection_contract(&collection)
1202            .map(|c| c.timestamps_enabled)
1203            .unwrap_or(false);
1204
1205        match &mut entity.data {
1206            crate::storage::EntityData::Row(row) => {
1207                let mut field_ops = Vec::new();
1208                let mut metadata_ops = Vec::new();
1209
1210                for mut op in operations {
1211                    let Some(root) = op.path.first().map(String::as_str) else {
1212                        return Err(crate::RedDBError::Query(
1213                            "patch path cannot be empty".to_string(),
1214                        ));
1215                    };
1216
1217                    match root {
1218                        "fields" | "named" => {
1219                            if op.path.len() < 2 {
1220                                return Err(crate::RedDBError::Query(
1221                                    "patch path 'fields' requires a nested key".to_string(),
1222                                ));
1223                            }
1224                            if row_contract_timestamps {
1225                                let leaf = op.path.get(1).map(String::as_str);
1226                                if matches!(leaf, Some("created_at") | Some("updated_at")) {
1227                                    return Err(crate::RedDBError::Query(format!(
1228                                        "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1229                                        collection,
1230                                        leaf.unwrap_or("")
1231                                    )));
1232                                }
1233                            }
1234                            op.path.remove(0);
1235                            field_ops.push(op);
1236                        }
1237                        "metadata" => {
1238                            if op.path.len() < 2 {
1239                                return Err(crate::RedDBError::Query(
1240                                    "patch path 'metadata' requires a nested key".to_string(),
1241                                ));
1242                            }
1243                            op.path.remove(0);
1244                            metadata_ops.push(op);
1245                        }
1246                        _ => {
1247                            return Err(crate::RedDBError::Query(format!(
1248                                "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1249                            )));
1250                        }
1251                    }
1252                }
1253
1254                if !field_ops.is_empty() {
1255                    context_index_dirty = true;
1256                    for op in &field_ops {
1257                        if let Some(col) = op.path.first() {
1258                            modified_columns.push(col.clone());
1259                        }
1260                    }
1261                    let named = row.named.get_or_insert_with(Default::default);
1262                    apply_patch_operations_to_storage_map(named, &field_ops)?;
1263                }
1264
1265                if let Some(fields) = payload
1266                    .get("fields")
1267                    .and_then(crate::json::Value::as_object)
1268                {
1269                    if row_contract_timestamps {
1270                        for key in fields.keys() {
1271                            if key == "created_at" || key == "updated_at" {
1272                                return Err(crate::RedDBError::Query(format!(
1273                                    "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1274                                    collection, key
1275                                )));
1276                            }
1277                        }
1278                    }
1279                    context_index_dirty = true;
1280                    let named = row.named.get_or_insert_with(Default::default);
1281                    for (key, value) in fields {
1282                        modified_columns.push(key.clone());
1283                        named.insert(key.clone(), json_to_storage_value(value)?);
1284                    }
1285                }
1286
1287                if !metadata_ops.is_empty() {
1288                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1289                    let metadata = patch_metadata.get_or_insert_with(|| {
1290                        store.get_metadata(&collection, id).unwrap_or_default()
1291                    });
1292                    let mut metadata_json = metadata_to_json(metadata);
1293                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1294                        .map_err(crate::RedDBError::Query)?;
1295                    *metadata = metadata_from_json(&metadata_json)?;
1296                    metadata_changed = true;
1297                }
1298
1299                if !modified_columns.is_empty() || row_contract_timestamps {
1300                    let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1301                    let current_fields = if let Some(named) = row.named.take() {
1302                        named.into_iter().collect::<Vec<_>>()
1303                    } else if let Some(schema) = row.schema.as_ref() {
1304                        schema
1305                            .iter()
1306                            .cloned()
1307                            .zip(row.columns.iter().cloned())
1308                            .collect::<Vec<_>>()
1309                    } else {
1310                        Vec::new()
1311                    };
1312                    let normalized_fields = contract.normalize_update_fields(current_fields)?;
1313                    if row_contract_timestamps {
1314                        modified_columns.push("updated_at".to_string());
1315                        context_index_dirty = true;
1316                    }
1317                    if contract.requires_uniqueness_check(&modified_columns) {
1318                        contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1319                    }
1320                    row.named = Some(normalized_fields.into_iter().collect());
1321                }
1322            }
1323            crate::storage::EntityData::Node(node) => {
1324                let mut field_ops = Vec::new();
1325                let mut metadata_ops = Vec::new();
1326
1327                for mut op in operations {
1328                    let Some(root) = op.path.first().map(String::as_str) else {
1329                        return Err(crate::RedDBError::Query(
1330                            "patch path cannot be empty".to_string(),
1331                        ));
1332                    };
1333
1334                    match root {
1335                        "fields" | "properties" => {
1336                            if op.path.len() < 2 {
1337                                return Err(crate::RedDBError::Query(
1338                                    "patch path 'fields' requires a nested key".to_string(),
1339                                ));
1340                            }
1341                            op.path.remove(0);
1342                            field_ops.push(op);
1343                        }
1344                        "metadata" => {
1345                            if op.path.len() < 2 {
1346                                return Err(crate::RedDBError::Query(
1347                                    "patch path 'metadata' requires a nested key".to_string(),
1348                                ));
1349                            }
1350                            op.path.remove(0);
1351                            metadata_ops.push(op);
1352                        }
1353                        _ => {
1354                            return Err(crate::RedDBError::Query(format!(
1355                                "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, or metadata/*"
1356                            )));
1357                        }
1358                    }
1359                }
1360
1361                if !field_ops.is_empty() {
1362                    context_index_dirty = true;
1363                    apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1364                }
1365
1366                if let Some(fields) = payload
1367                    .get("fields")
1368                    .and_then(crate::json::Value::as_object)
1369                {
1370                    context_index_dirty = true;
1371                    for (key, value) in fields {
1372                        node.properties
1373                            .insert(key.clone(), json_to_storage_value(value)?);
1374                    }
1375                }
1376
1377                if !metadata_ops.is_empty() {
1378                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1379                    let metadata = patch_metadata.get_or_insert_with(|| {
1380                        store.get_metadata(&collection, id).unwrap_or_default()
1381                    });
1382                    let mut metadata_json = metadata_to_json(metadata);
1383                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1384                        .map_err(crate::RedDBError::Query)?;
1385                    *metadata = metadata_from_json(&metadata_json)?;
1386                    metadata_changed = true;
1387                }
1388            }
1389            crate::storage::EntityData::Edge(edge) => {
1390                let mut field_ops = Vec::new();
1391                let mut metadata_ops = Vec::new();
1392                let mut weight_ops = Vec::new();
1393
1394                for mut op in operations {
1395                    let Some(root) = op.path.first().map(String::as_str) else {
1396                        return Err(crate::RedDBError::Query(
1397                            "patch path cannot be empty".to_string(),
1398                        ));
1399                    };
1400
1401                    match root {
1402                        "fields" | "properties" => {
1403                            if op.path.len() < 2 {
1404                                return Err(crate::RedDBError::Query(
1405                                    "patch path 'fields' requires a nested key".to_string(),
1406                                ));
1407                            }
1408                            op.path.remove(0);
1409                            field_ops.push(op);
1410                        }
1411                        "weight" => {
1412                            if op.path.len() != 1 {
1413                                return Err(crate::RedDBError::Query(
1414                                    "patch path 'weight' does not allow nested keys".to_string(),
1415                                ));
1416                            }
1417                            op.path.clear();
1418                            weight_ops.push(op);
1419                        }
1420                        "metadata" => {
1421                            if op.path.len() < 2 {
1422                                return Err(crate::RedDBError::Query(
1423                                    "patch path 'metadata' requires a nested key".to_string(),
1424                                ));
1425                            }
1426                            op.path.remove(0);
1427                            metadata_ops.push(op);
1428                        }
1429                        _ => {
1430                            return Err(crate::RedDBError::Query(format!(
1431                                "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1432                            )));
1433                        }
1434                    }
1435                }
1436
1437                if !field_ops.is_empty() {
1438                    context_index_dirty = true;
1439                    apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1440                }
1441
1442                for op in weight_ops {
1443                    context_index_dirty = true;
1444                    let value = op.value.ok_or_else(|| {
1445                        crate::RedDBError::Query("weight operations require a value".to_string())
1446                    })?;
1447
1448                    match op.op {
1449                        PatchEntityOperationType::Unset => {
1450                            return Err(crate::RedDBError::Query(
1451                                "weight cannot be unset through patch operations".to_string(),
1452                            ));
1453                        }
1454                        PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1455                            let Some(weight) = value.as_f64() else {
1456                                return Err(crate::RedDBError::Query(
1457                                    "weight operation requires a numeric value".to_string(),
1458                                ));
1459                            };
1460                            edge.weight = weight as f32;
1461                        }
1462                    }
1463                }
1464
1465                if let Some(fields) = payload
1466                    .get("fields")
1467                    .and_then(crate::json::Value::as_object)
1468                {
1469                    context_index_dirty = true;
1470                    for (key, value) in fields {
1471                        edge.properties
1472                            .insert(key.clone(), json_to_storage_value(value)?);
1473                    }
1474                }
1475
1476                if !metadata_ops.is_empty() {
1477                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1478                    let metadata = patch_metadata.get_or_insert_with(|| {
1479                        store.get_metadata(&collection, id).unwrap_or_default()
1480                    });
1481                    let mut metadata_json = metadata_to_json(metadata);
1482                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1483                        .map_err(crate::RedDBError::Query)?;
1484                    *metadata = metadata_from_json(&metadata_json)?;
1485                    metadata_changed = true;
1486                }
1487            }
1488            crate::storage::EntityData::Vector(vector) => {
1489                let mut field_ops = Vec::new();
1490                let mut metadata_ops = Vec::new();
1491
1492                for mut op in operations {
1493                    let Some(root) = op.path.first().map(String::as_str) else {
1494                        return Err(crate::RedDBError::Query(
1495                            "patch path cannot be empty".to_string(),
1496                        ));
1497                    };
1498
1499                    match root {
1500                        "fields" => {
1501                            if op.path.len() < 2 {
1502                                return Err(crate::RedDBError::Query(
1503                                    "patch path 'fields' requires a nested key".to_string(),
1504                                ));
1505                            }
1506                            op.path.remove(0);
1507                            let Some(target) = op.path.first().map(String::as_str) else {
1508                                return Err(crate::RedDBError::Query(
1509                                    "patch path requires a target under fields".to_string(),
1510                                ));
1511                            };
1512                            if !matches!(target, "dense" | "content" | "sparse") {
1513                                return Err(crate::RedDBError::Query(format!(
1514                                    "unsupported vector patch target '{target}'"
1515                                )));
1516                            }
1517                            field_ops.push(op);
1518                        }
1519                        "metadata" => {
1520                            if op.path.len() < 2 {
1521                                return Err(crate::RedDBError::Query(
1522                                    "patch path 'metadata' requires a nested key".to_string(),
1523                                ));
1524                            }
1525                            op.path.remove(0);
1526                            metadata_ops.push(op);
1527                        }
1528                        _ => {
1529                            return Err(crate::RedDBError::Query(format!(
1530                                "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1531                            )));
1532                        }
1533                    }
1534                }
1535
1536                if !field_ops.is_empty() {
1537                    context_index_dirty = true;
1538                    apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1539                }
1540
1541                if let Some(fields) = payload
1542                    .get("fields")
1543                    .and_then(crate::json::Value::as_object)
1544                {
1545                    context_index_dirty = true;
1546                    if let Some(content) =
1547                        fields.get("content").and_then(crate::json::Value::as_str)
1548                    {
1549                        vector.content = Some(content.to_string());
1550                    }
1551                    if let Some(dense) = fields.get("dense") {
1552                        vector.dense = dense
1553                            .as_array()
1554                            .ok_or_else(|| {
1555                                crate::RedDBError::Query(
1556                                    "field 'dense' must be an array".to_string(),
1557                                )
1558                            })?
1559                            .iter()
1560                            .map(|value| {
1561                                value.as_f64().map(|value| value as f32).ok_or_else(|| {
1562                                    crate::RedDBError::Query(
1563                                        "field 'dense' must contain only numbers".to_string(),
1564                                    )
1565                                })
1566                            })
1567                            .collect::<Result<Vec<_>, _>>()?;
1568                    }
1569                }
1570
1571                if !metadata_ops.is_empty() {
1572                    ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1573                    let metadata = patch_metadata.get_or_insert_with(|| {
1574                        store.get_metadata(&collection, id).unwrap_or_default()
1575                    });
1576                    let mut metadata_json = metadata_to_json(metadata);
1577                    apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1578                        .map_err(crate::RedDBError::Query)?;
1579                    *metadata = metadata_from_json(&metadata_json)?;
1580                    metadata_changed = true;
1581                }
1582            }
1583            crate::storage::EntityData::TimeSeries(_)
1584            | crate::storage::EntityData::QueueMessage(_) => {
1585                return Err(crate::RedDBError::Query(
1586                    "patch operations are not supported for TimeSeries or QueueMessage entities"
1587                        .to_string(),
1588                ));
1589            }
1590        }
1591
1592        if let Some(metadata) = payload
1593            .get("metadata")
1594            .and_then(crate::json::Value::as_object)
1595        {
1596            let patch_metadata = patch_metadata
1597                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1598            for (key, value) in metadata {
1599                ensure_non_tree_reserved_metadata_key(key)?;
1600                patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1601            }
1602            metadata_changed = true;
1603        }
1604
1605        for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1606            let patch_metadata = patch_metadata
1607                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1608            if matches!(value, crate::storage::unified::MetadataValue::Null) {
1609                patch_metadata.remove(&key);
1610            } else {
1611                patch_metadata.set(key, value);
1612            }
1613            metadata_changed = true;
1614        }
1615
1616        entity.updated_at = std::time::SystemTime::now()
1617            .duration_since(std::time::UNIX_EPOCH)
1618            .unwrap_or_default()
1619            .as_secs();
1620
1621        modified_columns = dedupe_modified_columns(modified_columns);
1622
1623        Ok(AppliedEntityMutation {
1624            id,
1625            collection,
1626            entity,
1627            metadata: patch_metadata,
1628            modified_columns,
1629            persist_metadata: metadata_changed,
1630            context_index_dirty,
1631            pre_mutation_fields,
1632        })
1633    }
1634
1635    pub(crate) fn apply_loaded_sql_update_row_core(
1636        &self,
1637        collection: String,
1638        mut entity: crate::storage::UnifiedEntity,
1639        static_field_assignments: &[(String, Value)],
1640        dynamic_field_assignments: Vec<(String, Value)>,
1641        static_metadata_assignments: &[(String, MetadataValue)],
1642        dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1643        row_contract_plan: Option<&RowUpdateContractPlan>,
1644        row_modified_columns_template: &[String],
1645        row_touches_unique_columns: bool,
1646    ) -> RedDBResult<AppliedEntityMutation> {
1647        let id = entity.id;
1648        let db = self.db();
1649        let store = db.store();
1650        let Some(_) = store.get_collection(&collection) else {
1651            return Err(crate::RedDBError::NotFound(format!(
1652                "collection not found: {collection}"
1653            )));
1654        };
1655
1656        let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1657        let row_contract_timestamps = row_contract_plan
1658            .map(|plan| plan.timestamps_enabled)
1659            .unwrap_or(false);
1660        let mut metadata_changed = false;
1661        let mut modified_columns = row_modified_columns_template.to_vec();
1662        let mut context_index_dirty = !modified_columns.is_empty();
1663
1664        // Snapshot OLD field values BEFORE applying the assignments —
1665        // the secondary-index maintenance hook needs both before/after to
1666        // delete-then-insert under changed indexed columns.
1667        let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1668
1669        let crate::storage::EntityData::Row(row) = &mut entity.data else {
1670            return Err(crate::RedDBError::Query(
1671                "SQL row update fast path requires a row entity".to_string(),
1672            ));
1673        };
1674
1675        let _ = row_contract_plan;
1676        apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1677        apply_row_field_assignments_raw(row, dynamic_field_assignments);
1678
1679        for (key, value) in static_metadata_assignments
1680            .iter()
1681            .cloned()
1682            .chain(dynamic_metadata_assignments)
1683        {
1684            ensure_non_tree_reserved_metadata_key(&key)?;
1685            patch_metadata
1686                .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1687                .set(key, value);
1688            metadata_changed = true;
1689        }
1690
1691        if !modified_columns.is_empty() || row_contract_timestamps {
1692            let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1693            if row_contract_timestamps {
1694                context_index_dirty = true;
1695                set_row_field(
1696                    row,
1697                    "updated_at",
1698                    Value::UnsignedInteger(current_unix_ms_u64()),
1699                );
1700                modified_columns.push("updated_at".to_string());
1701            }
1702            if row_touches_unique_columns {
1703                let current_fields = collect_row_fields(row);
1704                contract.enforce_row_uniqueness(&current_fields, Some(id))?;
1705            }
1706        }
1707
1708        entity.updated_at = std::time::SystemTime::now()
1709            .duration_since(std::time::UNIX_EPOCH)
1710            .unwrap_or_default()
1711            .as_secs();
1712
1713        modified_columns = dedupe_modified_columns(modified_columns);
1714
1715        Ok(AppliedEntityMutation {
1716            id,
1717            collection,
1718            entity,
1719            metadata: patch_metadata,
1720            modified_columns,
1721            persist_metadata: metadata_changed,
1722            context_index_dirty,
1723            pre_mutation_fields,
1724        })
1725    }
1726
1727    pub(crate) fn persist_applied_entity_mutations(
1728        &self,
1729        applied: &[AppliedEntityMutation],
1730    ) -> RedDBResult<()> {
1731        if applied.is_empty() {
1732            return Ok(());
1733        }
1734
1735        let store = self.db().store();
1736        let collection = &applied[0].collection;
1737        let Some(manager) = store.get_collection(collection) else {
1738            return Err(crate::RedDBError::NotFound(format!(
1739                "collection not found: {collection}"
1740            )));
1741        };
1742
1743        manager
1744            .update_hot_batch_with_metadata(applied.iter().map(|item| {
1745                (
1746                    &item.entity,
1747                    item.modified_columns.as_slice(),
1748                    if item.persist_metadata {
1749                        item.metadata.as_ref()
1750                    } else {
1751                        None
1752                    },
1753                )
1754            }))
1755            .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1756
1757        // PG-HOT-like fast path: segment in-place is done; when no
1758        // mutation touches a secondary-indexed column AND no metadata
1759        // payload needs to be folded into a B-tree record, skip the
1760        // in-line B-tree upsert. The WAL still records the update
1761        // (durability preserved; recovery replay rebuilds the B-tree),
1762        // and `manager.get()` prefers the live segment over the
1763        // B-tree for reads — so the short-circuit is invisible to
1764        // callers. See `persist_entities_to_pager_wal_only`.
1765        let indexed_cols = self
1766            .index_store_ref()
1767            .indexed_columns_set(collection.as_str());
1768        let all_hot = !indexed_cols.is_empty()
1769            && applied.iter().all(|item| {
1770                !item.persist_metadata
1771                    && !item
1772                        .modified_columns
1773                        .iter()
1774                        .any(|c| indexed_cols.contains(c))
1775            })
1776            || indexed_cols.is_empty() && applied.iter().all(|item| !item.persist_metadata);
1777
1778        // Pass `&[&UnifiedEntity]` — no per-entity clone. The SQL UPDATE
1779        // inner loop hands us `applied` which already owns the post-image
1780        // entity; all we need for the persist path is a read borrow.
1781        let entity_refs: Vec<&crate::storage::UnifiedEntity> =
1782            applied.iter().map(|item| &item.entity).collect();
1783        let persist_fn = if all_hot {
1784            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
1785        } else {
1786            crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
1787        };
1788        persist_fn(store.as_ref(), collection, &entity_refs)
1789            .map_err(|err| crate::RedDBError::Internal(err.to_string()))
1790    }
1791
1792    pub(crate) fn flush_applied_entity_mutation(
1793        &self,
1794        applied: &AppliedEntityMutation,
1795    ) -> RedDBResult<()> {
1796        let store = self.db().store();
1797        if applied.context_index_dirty {
1798            store
1799                .context_index()
1800                .index_entity(&applied.collection, &applied.entity);
1801        }
1802        // Secondary-index maintenance for SQL UPDATE / JSON-Patch flows.
1803        // Skip when pre_mutation_fields is empty (entity wasn't a row, or
1804        // didn't carry recoverable column names) — there's nothing to
1805        // delete-then-insert in that case.
1806        //
1807        // Also build the CDC damage vector here so downstream consumers
1808        // see which columns changed without re-diffing.
1809        let mut changed_columns: Option<Vec<String>> = None;
1810        if !applied.pre_mutation_fields.is_empty() {
1811            let post = entity_row_fields_snapshot(&applied.entity);
1812            if !post.is_empty() {
1813                let damage = crate::application::entity::row_damage_vector(
1814                    &applied.pre_mutation_fields,
1815                    &post,
1816                );
1817                if !damage.is_empty() {
1818                    changed_columns = Some(
1819                        damage
1820                            .touched_columns()
1821                            .into_iter()
1822                            .map(str::to_string)
1823                            .collect(),
1824                    );
1825                }
1826
1827                // HOT-like fast path (P3.T2/T3): when no modified
1828                // column is covered by a secondary index, skip the
1829                // `index_entity_update` call entirely. The function
1830                // would short-circuit internally, but the call still
1831                // reads the registry lock + walks the damage vector
1832                // — avoiding it saves a few microseconds per UPDATE.
1833                // Page-local replace + t_ctid chain support (true
1834                // HOT) lives in a follow-up storage spec.
1835                let indexed_cols: std::collections::HashSet<String> = self
1836                    .index_store_ref()
1837                    .list_indices(applied.collection.as_str())
1838                    .into_iter()
1839                    .filter_map(|idx| idx.columns.first().cloned())
1840                    .collect();
1841                let modified_cols: std::collections::HashSet<String> = damage
1842                    .touched_columns()
1843                    .into_iter()
1844                    .map(str::to_string)
1845                    .collect();
1846                let decision = crate::storage::engine::hot_update::decide(
1847                    &crate::storage::engine::hot_update::HotUpdateInputs {
1848                        collection: applied.collection.as_str(),
1849                        indexed_columns: &indexed_cols,
1850                        modified_columns: &modified_cols,
1851                        // The storage layer currently handles fit via
1852                        // the segment abstraction; we bypass the
1853                        // page-size check here.
1854                        new_tuple_size: 0,
1855                        page_free_space: usize::MAX,
1856                    },
1857                );
1858                if !decision.can_hot {
1859                    self.index_store_ref()
1860                        .index_entity_update(
1861                            &applied.collection,
1862                            applied.id,
1863                            &applied.pre_mutation_fields,
1864                            &post,
1865                        )
1866                        .map_err(crate::RedDBError::Internal)?;
1867                } else {
1868                    // F-04: `applied.collection` is tenant-supplied;
1869                    // strip CR/LF/control bytes via the LogField
1870                    // escaper (ADR 0010).
1871                    tracing::debug!(
1872                        collection = %reddb_wire::audit_safe_log_field(&applied.collection),
1873                        "hot_update fast-path: skipped index_entity_update"
1874                    );
1875                }
1876            }
1877        }
1878        self.cdc_emit_prebuilt_with_columns(
1879            crate::replication::cdc::ChangeOperation::Update,
1880            &applied.collection,
1881            &applied.entity,
1882            "entity",
1883            applied.metadata.as_ref(),
1884            true,
1885            changed_columns,
1886        );
1887        Ok(())
1888    }
1889
1890    pub(crate) fn apply_loaded_patch_entity(
1891        &self,
1892        collection: String,
1893        entity: crate::storage::UnifiedEntity,
1894        payload: JsonValue,
1895        operations: Vec<PatchEntityOperation>,
1896    ) -> RedDBResult<CreateEntityOutput> {
1897        let applied =
1898            self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
1899        self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
1900        self.flush_applied_entity_mutation(&applied)?;
1901        Ok(CreateEntityOutput {
1902            id: applied.id,
1903            entity: Some(applied.entity),
1904        })
1905    }
1906}
1907
1908fn ensure_non_tree_reserved_metadata_patch_paths(
1909    operations: &[PatchEntityOperation],
1910) -> RedDBResult<()> {
1911    for operation in operations {
1912        let Some(key) = operation.path.first().map(String::as_str) else {
1913            continue;
1914        };
1915        ensure_non_tree_reserved_metadata_key(key)?;
1916    }
1917    Ok(())
1918}
1919
1920fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
1921    if key.starts_with(TREE_METADATA_PREFIX) {
1922        return Err(crate::RedDBError::Query(format!(
1923            "metadata key '{}' is reserved for managed trees",
1924            key
1925        )));
1926    }
1927    Ok(())
1928}
1929
1930fn ensure_non_tree_reserved_metadata_entries(
1931    metadata: &[(String, MetadataValue)],
1932) -> RedDBResult<()> {
1933    for (key, _) in metadata {
1934        ensure_non_tree_reserved_metadata_key(key)?;
1935    }
1936    Ok(())
1937}
1938
1939fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
1940    if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
1941        return Err(crate::RedDBError::Query(format!(
1942            "edge label '{}' is reserved for managed trees",
1943            TREE_CHILD_EDGE_LABEL
1944        )));
1945    }
1946    Ok(())
1947}
1948
1949impl RedDBRuntime {
1950    pub(crate) fn create_node_unchecked(
1951        &self,
1952        input: CreateNodeInput,
1953    ) -> RedDBResult<CreateEntityOutput> {
1954        let db = self.db();
1955        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
1956        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
1957        let mut metadata = input.metadata;
1958        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
1959        let mut builder = db.node(&input.collection, &input.label);
1960
1961        if let Some(node_type) = input.node_type {
1962            builder = builder.node_type(node_type);
1963        }
1964
1965        for (key, value) in input.properties {
1966            builder = builder.property(key, value);
1967        }
1968
1969        for (key, value) in metadata {
1970            builder = builder.metadata(key, value);
1971        }
1972
1973        for embedding in input.embeddings {
1974            if let Some(model) = embedding.model {
1975                builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
1976            } else {
1977                builder = builder.embedding(embedding.name, embedding.vector);
1978            }
1979        }
1980
1981        for link in input.table_links {
1982            builder = builder.link_to_table(link.key, link.table);
1983        }
1984
1985        for link in input.node_links {
1986            builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
1987        }
1988
1989        let id = builder.save()?;
1990        // Phase 1.1 MVCC universal: stamp xmin so concurrent snapshots
1991        // don't see this node until the transaction commits.
1992        self.stamp_xmin_if_in_txn(&input.collection, id);
1993        refresh_context_index(&db, &input.collection, id)?;
1994        self.cdc_emit(
1995            crate::replication::cdc::ChangeOperation::Insert,
1996            &input.collection,
1997            id.raw(),
1998            "graph_node",
1999        );
2000        Ok(CreateEntityOutput {
2001            id,
2002            entity: db.store().get(&input.collection, id),
2003        })
2004    }
2005
2006    pub(crate) fn create_edge_unchecked(
2007        &self,
2008        input: CreateEdgeInput,
2009    ) -> RedDBResult<CreateEntityOutput> {
2010        let db = self.db();
2011        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2012        contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2013        let mut metadata = input.metadata;
2014        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2015        let mut builder = db
2016            .edge(&input.collection, &input.label)
2017            .from(input.from)
2018            .to(input.to);
2019
2020        if let Some(weight) = input.weight {
2021            builder = builder.weight(weight);
2022        }
2023
2024        for (key, value) in input.properties {
2025            builder = builder.property(key, value);
2026        }
2027
2028        for (key, value) in metadata {
2029            builder = builder.metadata(key, value);
2030        }
2031
2032        let id = builder.save()?;
2033        // Phase 1.1 MVCC universal: stamp xmin on the edge so other
2034        // sessions don't follow it until COMMIT.
2035        self.stamp_xmin_if_in_txn(&input.collection, id);
2036        refresh_context_index(&db, &input.collection, id)?;
2037        self.cdc_emit(
2038            crate::replication::cdc::ChangeOperation::Insert,
2039            &input.collection,
2040            id.raw(),
2041            "graph_edge",
2042        );
2043        Ok(CreateEntityOutput {
2044            id,
2045            entity: db.store().get(&input.collection, id),
2046        })
2047    }
2048}
2049
2050impl RuntimeEntityPort for RedDBRuntime {
2051    fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2052        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2053        let db = self.db();
2054        let CreateRowInput {
2055            collection,
2056            fields,
2057            metadata: input_metadata,
2058            node_links,
2059            vector_links,
2060        } = input;
2061        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2062        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2063        let mut metadata = input_metadata;
2064        apply_collection_default_ttl(&db, &collection, &mut metadata);
2065        let fields = contract.normalize_insert_fields(fields)?;
2066        contract.enforce_row_uniqueness(&fields, None)?;
2067        // Route through MutationEngine for unified hot path.
2068        let engine = self.mutation_engine();
2069        let result = engine.apply(
2070            collection.clone(),
2071            vec![crate::runtime::mutation::MutationRow {
2072                fields,
2073                metadata,
2074                node_links,
2075                vector_links,
2076            }],
2077        )?;
2078        let id = result.ids[0];
2079        // Perf: `db.get(id)` does a *cross-collection* scan (get_any)
2080        // that also takes a write lock on the entity cache. We know
2081        // the collection — hit the manager directly. Cuts
2082        // create_row() p50 roughly in half on the hot path.
2083        Ok(CreateEntityOutput {
2084            id,
2085            entity: db.store().get(&collection, id),
2086        })
2087    }
2088
2089    fn create_rows_batch(
2090        &self,
2091        input: CreateRowsBatchInput,
2092    ) -> RedDBResult<Vec<CreateEntityOutput>> {
2093        if input.rows.is_empty() {
2094            return Ok(Vec::new());
2095        }
2096        self.check_batch_size(input.rows.len())?;
2097        self.check_db_size()?;
2098
2099        let db = self.db();
2100        let collection = input.collection;
2101        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2102        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2103
2104        let mut prepared_rows = Vec::with_capacity(input.rows.len());
2105        let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2106        for row in input.rows {
2107            if row.collection != collection {
2108                return Err(crate::RedDBError::Query(format!(
2109                    "batch row collection mismatch: expected '{}', got '{}'",
2110                    collection, row.collection
2111                )));
2112            }
2113
2114            let mut metadata = row.metadata;
2115            apply_collection_default_ttl(&db, &collection, &mut metadata);
2116            let fields = contract.normalize_insert_fields(row.fields)?;
2117            contract.enforce_row_uniqueness(&fields, None)?;
2118            uniqueness_rows.push(fields.clone());
2119            prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2120        }
2121
2122        contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2123
2124        // Route through MutationEngine: single bulk_insert + one CDC batch
2125        // instead of N separate cdc_emit() calls (each acquires a write lock).
2126        let engine = {
2127            let e = self.mutation_engine();
2128            if input.suppress_events {
2129                e.with_suppress_events()
2130            } else {
2131                e
2132            }
2133        };
2134        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2135            .into_iter()
2136            .map(|(fields, metadata, node_links, vector_links)| {
2137                crate::runtime::mutation::MutationRow {
2138                    fields,
2139                    metadata,
2140                    node_links,
2141                    vector_links,
2142                }
2143            })
2144            .collect();
2145
2146        let result = engine
2147            .apply(collection.clone(), mutation_rows)
2148            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2149
2150        let store = db.store();
2151        Ok(result
2152            .ids
2153            .into_iter()
2154            .map(|id| CreateEntityOutput {
2155                id,
2156                entity: store.get(&collection, id),
2157            })
2158            .collect())
2159    }
2160
2161    fn create_rows_batch_prevalidated_columnar(
2162        &self,
2163        collection: String,
2164        column_names: std::sync::Arc<Vec<String>>,
2165        rows: Vec<Vec<crate::storage::schema::Value>>,
2166    ) -> RedDBResult<usize> {
2167        use crate::storage::{
2168            unified::{EntityData, EntityKind, RowData},
2169            EntityId, UnifiedEntity,
2170        };
2171        use std::sync::Arc;
2172
2173        if rows.is_empty() {
2174            return Ok(0);
2175        }
2176        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2177        self.check_batch_size(rows.len())?;
2178        self.check_db_size()?;
2179
2180        let db = self.db();
2181        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2182        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2183
2184        let store = db.store();
2185        let ncols = column_names.len();
2186        let table_arc: Arc<str> = Arc::from(collection.as_str());
2187
2188        // If the collection has any registered secondary index, snapshot
2189        // each row's (column, value) pairs *before* the values move into
2190        // entities. We need them for `index_entity_insert_batch` after
2191        // `bulk_insert` returns the assigned IDs. When no indexes are
2192        // registered the snapshot is pure waste — gate it. Mirrors the
2193        // `has_secondary_indexes` short-circuit in MutationEngine.
2194        let indexed_cols = self
2195            .index_store_ref()
2196            .indexed_columns_set(collection.as_str());
2197        let has_secondary_indexes = !indexed_cols.is_empty();
2198        let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2199            if has_secondary_indexes {
2200                Vec::with_capacity(rows.len())
2201            } else {
2202                Vec::new()
2203            };
2204
2205        // Build `UnifiedEntity`s directly in columnar form — no
2206        // HashMap<String, Value> per row, no (String, Value) tuples,
2207        // no named→columnar conversion pass inside the manager. Every
2208        // entity shares the same `Arc<Vec<String>>` schema; the row
2209        // payload is exactly the `Vec<Value>` the wire handed us.
2210        let entities: Vec<UnifiedEntity> = rows
2211            .into_iter()
2212            .map(|values| {
2213                if values.len() != ncols {
2214                    // Row width mismatch — shouldn't happen after wire
2215                    // decode, but guard against it. Fall back to zero
2216                    // so the caller sees the error.
2217                }
2218                if has_secondary_indexes {
2219                    // Only snapshot indexed columns to keep alloc cost
2220                    // proportional to index count, not row width.
2221                    let mut snap: Vec<(String, crate::storage::schema::Value)> =
2222                        Vec::with_capacity(indexed_cols.len());
2223                    for (name, val) in column_names.iter().zip(values.iter()) {
2224                        if indexed_cols.contains(name) {
2225                            snap.push((name.clone(), val.clone()));
2226                        }
2227                    }
2228                    field_snapshots.push(snap);
2229                }
2230                let mut row = RowData::new(values);
2231                row.schema = Some(Arc::clone(&column_names));
2232                UnifiedEntity::new(
2233                    EntityId::new(0),
2234                    EntityKind::TableRow {
2235                        table: Arc::clone(&table_arc),
2236                        row_id: 0,
2237                    },
2238                    EntityData::Row(row),
2239                )
2240            })
2241            .collect();
2242
2243        let ids = store
2244            .bulk_insert(&collection, entities)
2245            .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2246
2247        // Secondary-index maintenance — required for post-CREATE-INDEX
2248        // inserts to stay visible to indexed queries. Without this, the
2249        // mini-duel `mixed_workload_indexed` scenario sees ~half its
2250        // post-seed inserts disappear from `select_filtered`.
2251        if has_secondary_indexes {
2252            let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2253                .iter()
2254                .zip(field_snapshots)
2255                .map(|(id, fields)| (*id, fields))
2256                .collect();
2257            self.index_store_ref()
2258                .index_entity_insert_batch(&collection, &index_rows)
2259                .map_err(crate::RedDBError::Internal)?;
2260        }
2261
2262        // CDC + cache invalidation — one call, not N (same pattern
2263        // as MutationEngine::append_batch).
2264        self.invalidate_result_cache();
2265        self.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2266
2267        Ok(ids.len())
2268    }
2269
2270    fn create_rows_batch_columnar(
2271        &self,
2272        collection: String,
2273        column_names: std::sync::Arc<Vec<String>>,
2274        rows: Vec<Vec<crate::storage::schema::Value>>,
2275    ) -> RedDBResult<usize> {
2276        if rows.is_empty() {
2277            return Ok(0);
2278        }
2279        self.check_batch_size(rows.len())?;
2280        self.check_db_size()?;
2281
2282        let db = self.db();
2283        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2284        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2285
2286        // Fast path: when the collection carries no contract (or the
2287        // contract has no declared columns) `normalize_insert_fields`
2288        // is a no-op and `enforce_row_uniqueness` finds no unique
2289        // columns to check. Skip the per-row `(String, Value)` tuple
2290        // materialisation entirely and route through the prevalidated
2291        // columnar kernel — same write, CDC, and index path, just
2292        // without the wasted (String, Value) clones. This is the
2293        // bench `bench_users` shape (no contract declared by the
2294        // adapter's `setup_schema`).
2295        let needs_normalisation = match db.collection_contract(&collection) {
2296            Some(c) => {
2297                c.declared_model == crate::catalog::CollectionModel::Table
2298                    && (!c.declared_columns.is_empty()
2299                        || c.table_def
2300                            .as_ref()
2301                            .map(|t| !t.columns.is_empty())
2302                            .unwrap_or(false))
2303            }
2304            None => false,
2305        };
2306        if !needs_normalisation {
2307            return self.create_rows_batch_prevalidated_columnar(collection, column_names, rows);
2308        }
2309
2310        // Slow path: contract requires per-row normalisation /
2311        // uniqueness checks. Materialise `Vec<(String, Value)>` from
2312        // the columnar layout (this still pays N×ncols `String::clone`
2313        // — but it's deferred out of the wire decoder hot loop and
2314        // happens behind a runtime API, not the per-row decode loop)
2315        // and fall through to the existing `create_rows_batch` path.
2316        let ncols = column_names.len();
2317        let tuple_rows: Vec<CreateRowInput> = rows
2318            .into_iter()
2319            .map(|values| {
2320                let mut fields: Vec<(String, crate::storage::schema::Value)> =
2321                    Vec::with_capacity(ncols);
2322                for (name, value) in column_names.iter().zip(values) {
2323                    fields.push((name.clone(), value));
2324                }
2325                CreateRowInput {
2326                    collection: collection.clone(),
2327                    fields,
2328                    metadata: Vec::new(),
2329                    node_links: Vec::new(),
2330                    vector_links: Vec::new(),
2331                }
2332            })
2333            .collect();
2334        self.create_rows_batch(CreateRowsBatchInput {
2335            collection,
2336            rows: tuple_rows,
2337            suppress_events: false,
2338        })
2339        .map(|out| out.len())
2340    }
2341
2342    fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2343        if input.rows.is_empty() {
2344            return Ok(0);
2345        }
2346        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2347        self.check_batch_size(input.rows.len())?;
2348        self.check_db_size()?;
2349
2350        let db = self.db();
2351        let collection = input.collection;
2352        // Still verify the collection's declared model before we blast
2353        // rows at it — this one-off check is O(1), independent of
2354        // ncols, and catches schema-kind mismatches that the client
2355        // can't always see.
2356        let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2357        contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2358
2359        // Hoist the per-collection default TTL lookup out of the
2360        // per-row loop — it only depends on the collection, not on
2361        // individual rows, and the old path did one HashMap read per
2362        // row (25k reads for a 25k typed_insert bulk). Fetch it once,
2363        // apply to any row whose metadata doesn't already carry a TTL.
2364        let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2365
2366        let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2367            Vec::with_capacity(input.rows.len());
2368        let mut mutation_rows = mutation_rows;
2369        for row in input.rows {
2370            if row.collection != collection {
2371                return Err(crate::RedDBError::Query(format!(
2372                    "batch row collection mismatch: expected '{}', got '{}'",
2373                    collection, row.collection
2374                )));
2375            }
2376            let mut metadata = row.metadata;
2377            if let Some(ttl) = default_ttl_ms {
2378                if !has_internal_ttl_metadata(&metadata) {
2379                    metadata.push((
2380                        "_ttl_ms".to_string(),
2381                        if ttl <= i64::MAX as u64 {
2382                            MetadataValue::Int(ttl as i64)
2383                        } else {
2384                            MetadataValue::Timestamp(ttl)
2385                        },
2386                    ));
2387                }
2388            }
2389            mutation_rows.push(crate::runtime::mutation::MutationRow {
2390                fields: row.fields,
2391                metadata,
2392                node_links: row.node_links,
2393                vector_links: row.vector_links,
2394            });
2395        }
2396
2397        let engine = self.mutation_engine();
2398        let result = engine
2399            .apply(collection, mutation_rows)
2400            .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2401        Ok(result.ids.len())
2402    }
2403
2404    fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2405        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2406        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2407        self.create_node_unchecked(input)
2408    }
2409
2410    fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2411        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2412        ensure_non_tree_structural_edge_label(&input.label)?;
2413        ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2414        self.create_edge_unchecked(input)
2415    }
2416
2417    fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2418        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2419        let db = self.db();
2420        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2421        contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2422        let mut metadata = input.metadata;
2423        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2424        let mut builder = db.vector(&input.collection).dense(input.dense);
2425
2426        if let Some(content) = input.content {
2427            builder = builder.content(content);
2428        }
2429
2430        for (key, value) in metadata {
2431            builder = builder.metadata(key, value);
2432        }
2433
2434        if let Some(link_row) = input.link_row {
2435            builder = builder.link_to_table(link_row);
2436        }
2437
2438        if let Some(link_node) = input.link_node {
2439            builder = builder.link_to_node(link_node);
2440        }
2441
2442        let id = builder.save()?;
2443        // Phase 1.1 MVCC universal: stamp xmin on the vector so
2444        // concurrent ANN scans hide it until the transaction commits.
2445        self.stamp_xmin_if_in_txn(&input.collection, id);
2446        refresh_context_index(&db, &input.collection, id)?;
2447        self.cdc_emit(
2448            crate::replication::cdc::ChangeOperation::Insert,
2449            &input.collection,
2450            id.raw(),
2451            "vector",
2452        );
2453        Ok(CreateEntityOutput {
2454            id,
2455            entity: db.store().get(&input.collection, id),
2456        })
2457    }
2458
2459    fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2460        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2461        let db = self.db();
2462        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2463        contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2464
2465        // Serialize the full body as Value::Json for the "body" field
2466        let body_bytes = json_to_vec(&input.body).map_err(|err| {
2467            crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2468        })?;
2469        let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2470            "body".to_string(),
2471            crate::storage::schema::Value::Json(body_bytes),
2472        )];
2473
2474        // Flatten top-level keys from the body into named fields for filtering
2475        if let JsonValue::Object(ref map) = input.body {
2476            for (key, value) in map {
2477                let storage_value = json_to_storage_value(value)?;
2478                fields.push((key.clone(), storage_value));
2479            }
2480        }
2481
2482        let mut metadata = input.metadata;
2483        apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2484        let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2485            .iter()
2486            .map(|(key, value)| (key.as_str(), value.clone()))
2487            .collect();
2488        let mut builder = db.row(&input.collection, columns);
2489
2490        for (key, value) in metadata {
2491            builder = builder.metadata(key, value);
2492        }
2493
2494        for node in input.node_links {
2495            builder = builder.link_to_node(node);
2496        }
2497
2498        for vector in input.vector_links {
2499            builder = builder.link_to_vector(vector);
2500        }
2501
2502        let id = builder.save()?;
2503        // Phase 1.1 MVCC universal: stamp xmin on the document.
2504        self.stamp_xmin_if_in_txn(&input.collection, id);
2505        refresh_context_index(&db, &input.collection, id)?;
2506        self.cdc_emit(
2507            crate::replication::cdc::ChangeOperation::Insert,
2508            &input.collection,
2509            id.raw(),
2510            "document",
2511        );
2512        Ok(CreateEntityOutput {
2513            id,
2514            entity: db.store().get(&input.collection, id),
2515        })
2516    }
2517
2518    fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2519        let db = self.db();
2520        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2521        let declared_model = db
2522            .collection_contract(&input.collection)
2523            .map(|contract| contract.declared_model);
2524        let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2525            contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2526            self.seal_vault_value(&input.collection, input.value)?
2527        } else {
2528            contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2529            input.value
2530        };
2531        let fields = vec![
2532            (
2533                "key".to_string(),
2534                crate::storage::schema::Value::text(input.key),
2535            ),
2536            ("value".to_string(), value),
2537        ];
2538        let collection = input.collection;
2539        let result = self.mutation_engine().apply(
2540            collection.clone(),
2541            vec![crate::runtime::mutation::MutationRow {
2542                fields,
2543                metadata: input.metadata,
2544                node_links: Vec::new(),
2545                vector_links: Vec::new(),
2546            }],
2547        )?;
2548        let id = result.ids[0];
2549        Ok(CreateEntityOutput {
2550            id,
2551            entity: db.store().get(&collection, id),
2552        })
2553    }
2554
2555    fn create_timeseries_point(
2556        &self,
2557        input: CreateTimeSeriesPointInput,
2558    ) -> RedDBResult<CreateEntityOutput> {
2559        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2560        let db = self.db();
2561        let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2562        contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2563
2564        let mut fields = vec![
2565            (
2566                "metric".to_string(),
2567                crate::storage::schema::Value::text(input.metric),
2568            ),
2569            (
2570                "value".to_string(),
2571                crate::storage::schema::Value::Float(input.value),
2572            ),
2573        ];
2574
2575        if let Some(timestamp_ns) = input.timestamp_ns {
2576            fields.push((
2577                "timestamp".to_string(),
2578                crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2579            ));
2580        }
2581
2582        if !input.tags.is_empty() {
2583            let tags_json = JsonValue::Object(
2584                input
2585                    .tags
2586                    .into_iter()
2587                    .map(|(key, value)| (key, JsonValue::String(value)))
2588                    .collect(),
2589            );
2590            let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2591                crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2592            })?;
2593            fields.push((
2594                "tags".to_string(),
2595                crate::storage::schema::Value::Json(tags_bytes),
2596            ));
2597        }
2598
2599        let collection = input.collection;
2600        let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2601        // Phase 1.1 MVCC universal: stamp xmin on the point so
2602        // concurrent range scans hide it until COMMIT.
2603        self.stamp_xmin_if_in_txn(&collection, id);
2604        refresh_context_index(&db, &collection, id)?;
2605
2606        Ok(CreateEntityOutput {
2607            id,
2608            entity: db.store().get(&collection, id),
2609        })
2610    }
2611
2612    fn get_kv(
2613        &self,
2614        collection: &str,
2615        key: &str,
2616    ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2617        let db = self.db();
2618        ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2619        let store = db.store();
2620        let Some(manager) = store.get_collection(collection) else {
2621            return Ok(None);
2622        };
2623        let entities = manager.query_all(|_| true);
2624        for entity in entities {
2625            if let crate::storage::EntityData::Row(ref row) = entity.data {
2626                if let Some(ref named) = row.named {
2627                    if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2628                        if &**k == key {
2629                            let value = named
2630                                .get("value")
2631                                .cloned()
2632                                .unwrap_or(crate::storage::schema::Value::Null);
2633                            return Ok(Some((value, entity.id)));
2634                        }
2635                    }
2636                }
2637            }
2638        }
2639        Ok(None)
2640    }
2641
2642    fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2643        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2644        let found = self.get_kv(collection, key)?;
2645        if let Some((_, id)) = found {
2646            let db = self.db();
2647            let store = db.store();
2648            let deleted = store
2649                .delete(collection, id)
2650                .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2651            if deleted {
2652                store.context_index().remove_entity(id);
2653            }
2654            Ok(deleted)
2655        } else {
2656            Ok(false)
2657        }
2658    }
2659
2660    fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2661        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2662        let PatchEntityInput {
2663            collection,
2664            id,
2665            payload,
2666            operations,
2667        } = input;
2668        let db = self.db();
2669        let store = db.store();
2670        let Some(manager) = store.get_collection(&collection) else {
2671            return Err(crate::RedDBError::NotFound(format!(
2672                "collection not found: {collection}"
2673            )));
2674        };
2675        let Some(entity) = manager.get(id) else {
2676            return Err(crate::RedDBError::NotFound(format!(
2677                "entity not found: {}",
2678                id.raw()
2679            )));
2680        };
2681        self.apply_loaded_patch_entity(collection, entity, payload, operations)
2682    }
2683
2684    fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2685        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2686        let store = self.db().store();
2687        // Snapshot row fields before delete so we can mirror the removal
2688        // into every secondary index. The fetch is best-effort: if the
2689        // entity is already gone, the delete below is a no-op anyway.
2690        let pre_delete_fields = store
2691            .get(&input.collection, input.id)
2692            .as_ref()
2693            .map(entity_row_fields_snapshot)
2694            .unwrap_or_default();
2695        // Store delete first (source of truth). Crash between here and index removal
2696        // leaves the entity invisible to most queries but recoverable; the reverse
2697        // (remove index first, then crash) leaves the entity permanently orphaned.
2698        let deleted = store
2699            .delete(&input.collection, input.id)
2700            .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2701        if deleted {
2702            store.context_index().remove_entity(input.id);
2703            // Secondary index maintenance — surface only registry-shape
2704            // errors; missing-index removals are tolerated inside the call.
2705            if !pre_delete_fields.is_empty() {
2706                self.index_store_ref()
2707                    .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2708                    .map_err(crate::RedDBError::Internal)?;
2709            }
2710            self.cdc_emit(
2711                crate::replication::cdc::ChangeOperation::Delete,
2712                &input.collection,
2713                input.id.raw(),
2714                "entity",
2715            );
2716        }
2717        Ok(DeleteEntityOutput {
2718            deleted,
2719            id: input.id,
2720        })
2721    }
2722}