1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 vector_dimension: None,
112 vector_metric: None,
113 context_index_fields: Vec::new(),
114 declared_columns: Vec::new(),
115 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117 timestamps_enabled: false,
118 context_index_enabled: false,
119 metrics_raw_retention_ms: None,
120 metrics_rollup_policies: Vec::new(),
121 metrics_tenant_identity: None,
122 metrics_namespace: None,
123 append_only: false,
126 subscriptions: Vec::new(),
127 })
128 .map(|_| ())
129 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
130}
131
132fn implicit_contract_unix_ms() -> u128 {
133 std::time::SystemTime::now()
134 .duration_since(std::time::UNIX_EPOCH)
135 .unwrap_or_default()
136 .as_millis()
137}
138
139fn collection_model_allows(
140 declared_model: crate::catalog::CollectionModel,
141 requested_model: crate::catalog::CollectionModel,
142) -> bool {
143 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
144}
145
146fn ensure_vector_dimension_contract(
147 db: &crate::storage::unified::devx::RedDB,
148 collection: &str,
149 actual_dimension: usize,
150) -> RedDBResult<()> {
151 let Some(expected_dimension) = db
152 .collection_contract(collection)
153 .and_then(|contract| contract.vector_dimension)
154 else {
155 return Ok(());
156 };
157 if expected_dimension == actual_dimension {
158 return Ok(());
159 }
160 Err(crate::RedDBError::Query(format!(
161 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
162 )))
163}
164
165fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
166 match model {
167 crate::catalog::CollectionModel::Table => "table",
168 crate::catalog::CollectionModel::Document => "document",
169 crate::catalog::CollectionModel::Graph => "graph",
170 crate::catalog::CollectionModel::Vector => "vector",
171 crate::catalog::CollectionModel::Hll => "hll",
172 crate::catalog::CollectionModel::Sketch => "sketch",
173 crate::catalog::CollectionModel::Filter => "filter",
174 crate::catalog::CollectionModel::Kv => "kv",
175 crate::catalog::CollectionModel::Config => "config",
176 crate::catalog::CollectionModel::Vault => "vault",
177 crate::catalog::CollectionModel::Mixed => "mixed",
178 crate::catalog::CollectionModel::TimeSeries => "timeseries",
179 crate::catalog::CollectionModel::Queue => "queue",
180 crate::catalog::CollectionModel::Metrics => "metrics",
181 }
182}
183
184#[derive(Clone)]
185struct UniquenessRule {
186 name: String,
187 columns: Vec<String>,
188 primary_key: bool,
189}
190
191#[derive(Copy, Clone, PartialEq, Eq)]
192enum NormalizeMode {
193 Insert,
197 Update,
201}
202
203mod collection_contract_enforcement {
204 use super::*;
205
206 pub(super) struct CollectionContractWriteEnforcer<'a> {
207 db: &'a crate::storage::unified::devx::RedDB,
208 collection: &'a str,
209 }
210
211 impl<'a> CollectionContractWriteEnforcer<'a> {
212 pub(super) fn new(
213 db: &'a crate::storage::unified::devx::RedDB,
214 collection: &'a str,
215 ) -> Self {
216 Self { db, collection }
217 }
218
219 pub(super) fn ensure_model(
220 &self,
221 requested_model: crate::catalog::CollectionModel,
222 ) -> RedDBResult<()> {
223 ensure_collection_model_contract(self.db, self.collection, requested_model)
224 }
225
226 pub(super) fn normalize_insert_fields(
227 &self,
228 fields: Vec<(String, Value)>,
229 ) -> RedDBResult<Vec<(String, Value)>> {
230 normalize_row_fields_for_contract_with_mode(
231 self.db,
232 self.collection,
233 fields,
234 NormalizeMode::Insert,
235 )
236 }
237
238 pub(super) fn normalize_update_fields(
239 &self,
240 fields: Vec<(String, Value)>,
241 ) -> RedDBResult<Vec<(String, Value)>> {
242 normalize_row_fields_for_contract_with_mode(
243 self.db,
244 self.collection,
245 fields,
246 NormalizeMode::Update,
247 )
248 }
249
250 pub(super) fn enforce_row_uniqueness(
251 &self,
252 fields: &[(String, Value)],
253 exclude_id: Option<crate::storage::EntityId>,
254 ) -> RedDBResult<()> {
255 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
256 }
257
258 pub(super) fn enforce_batch_uniqueness(
259 &self,
260 rows: &[Vec<(String, Value)>],
261 ) -> RedDBResult<()> {
262 enforce_row_batch_uniqueness(self.db, self.collection, rows)
263 }
264
265 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
266 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
267 }
268 }
269}
270
271use collection_contract_enforcement::CollectionContractWriteEnforcer;
272
273fn normalize_row_fields_for_contract_with_mode(
274 db: &crate::storage::unified::devx::RedDB,
275 collection: &str,
276 fields: Vec<(String, Value)>,
277 mode: NormalizeMode,
278) -> RedDBResult<Vec<(String, Value)>> {
279 let Some(contract) = db.collection_contract(collection) else {
280 return Ok(fields);
281 };
282
283 if contract.declared_model != crate::catalog::CollectionModel::Table
284 || (contract.declared_columns.is_empty()
285 && contract
286 .table_def
287 .as_ref()
288 .map(|table| table.columns.is_empty())
289 .unwrap_or(true))
290 {
291 return Ok(fields);
292 }
293
294 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
304 fields
305 .iter()
306 .find(|(n, _)| n == "created_at")
307 .map(|(_, v)| v.clone())
308 } else {
309 None
310 };
311
312 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
317 for (name, _) in &fields {
318 if name == "created_at" || name == "updated_at" {
319 return Err(crate::RedDBError::Query(format!(
320 "collection '{}' manages '{}' automatically — do not set it in INSERT",
321 collection, name
322 )));
323 }
324 }
325 }
326
327 let mut provided = std::collections::BTreeMap::new();
328 for (name, value) in &fields {
329 provided.insert(name.clone(), value.clone());
330 }
331
332 let resolved_columns = resolved_contract_columns(&contract)?;
333 let declared_names: std::collections::BTreeSet<String> = resolved_columns
334 .iter()
335 .map(|column| column.name.clone())
336 .collect();
337 let unknown_fields: Vec<String> = fields
338 .iter()
339 .filter(|(name, _)| !declared_names.contains(name))
340 .map(|(name, _)| name.clone())
341 .collect();
342 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
343 && !unknown_fields.is_empty()
344 {
345 return Err(crate::RedDBError::Query(format!(
346 "collection '{}' is strict and does not allow undeclared fields: {}",
347 collection,
348 unknown_fields.join(", ")
349 )));
350 }
351 let mut normalized = Vec::new();
352 let now_ms = current_unix_ms_u64();
353
354 for column in &resolved_columns {
355 match provided.remove(&column.name) {
356 Some(value) => {
357 if contract.timestamps_enabled && mode == NormalizeMode::Update {
362 match column.name.as_str() {
363 "created_at" => {
364 normalized.push((
365 column.name.clone(),
366 existing_created_at
367 .clone()
368 .unwrap_or(Value::UnsignedInteger(now_ms)),
369 ));
370 continue;
371 }
372 "updated_at" => {
373 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
374 continue;
375 }
376 _ => {}
377 }
378 }
379 normalized.push((
380 column.name.clone(),
381 normalize_contract_value(collection, column, value)?,
382 ));
383 }
384 None => {
385 if contract.timestamps_enabled
389 && (column.name == "created_at" || column.name == "updated_at")
390 {
391 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
392 continue;
393 }
394 if let Some(default) = &column.default {
395 normalized.push((
396 column.name.clone(),
397 coerce_contract_literal(collection, &column.name, column, default)?,
398 ));
399 } else if column.not_null {
400 return Err(crate::RedDBError::Query(format!(
401 "missing required column '{}' for collection '{}'",
402 column.name, collection
403 )));
404 }
405 }
406 }
407 }
408
409 for (name, value) in fields {
410 if !declared_names.contains(&name) {
411 normalized.push((name, value));
412 }
413 }
414
415 Ok(normalized)
416}
417
418fn current_unix_ms_u64() -> u64 {
419 std::time::SystemTime::now()
420 .duration_since(std::time::UNIX_EPOCH)
421 .map(|d| d.as_millis() as u64)
422 .unwrap_or(0)
423}
424
425fn enforce_row_uniqueness(
426 db: &crate::storage::unified::devx::RedDB,
427 collection: &str,
428 fields: &[(String, Value)],
429 exclude_id: Option<crate::storage::EntityId>,
430) -> RedDBResult<()> {
431 let Some(contract) = db.collection_contract(collection) else {
432 return Ok(());
433 };
434 if contract.declared_model != crate::catalog::CollectionModel::Table
435 && contract.declared_model != crate::catalog::CollectionModel::Mixed
436 {
437 return Ok(());
438 }
439
440 let rules = resolved_uniqueness_rules(&contract);
441 if rules.is_empty() {
442 return Ok(());
443 }
444
445 let store = db.store();
446 let Some(manager) = store.get_collection(collection) else {
447 return Ok(());
448 };
449
450 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
451
452 for rule in &rules {
453 let mut expected_signatures = Vec::new();
454 let mut skip_rule = false;
455
456 for column in &rule.columns {
457 match input_fields.get(column) {
458 Some(Value::Null) | None if rule.primary_key => {
459 return Err(crate::RedDBError::Query(format!(
460 "primary key '{}' in collection '{}' requires non-null column '{}'",
461 rule.name, collection, column
462 )))
463 }
464 Some(Value::Null) | None => {
465 skip_rule = true;
466 break;
467 }
468 Some(value) => {
469 expected_signatures.push((column.clone(), value_signature(value)));
470 }
471 }
472 }
473
474 if skip_rule {
475 continue;
476 }
477
478 for entity in manager.query_all(|_| true) {
479 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
480 continue;
481 }
482 let Some(existing_fields) = row_fields_from_entity(&entity) else {
483 continue;
484 };
485
486 let duplicate = expected_signatures.iter().all(|(column, expected)| {
487 existing_fields
488 .get(column)
489 .map(|value| value_signature(value) == *expected)
490 .unwrap_or(false)
491 });
492
493 if duplicate {
494 let qualifier = if rule.primary_key {
495 "primary key"
496 } else {
497 "unique constraint"
498 };
499 return Err(crate::RedDBError::Query(format!(
500 "{} '{}' violated on collection '{}' for columns [{}]",
501 qualifier,
502 rule.name,
503 collection,
504 rule.columns.join(", ")
505 )));
506 }
507 }
508 }
509
510 Ok(())
511}
512
513fn enforce_row_batch_uniqueness(
514 db: &crate::storage::unified::devx::RedDB,
515 collection: &str,
516 rows: &[Vec<(String, Value)>],
517) -> RedDBResult<()> {
518 let Some(contract) = db.collection_contract(collection) else {
519 return Ok(());
520 };
521 if contract.declared_model != crate::catalog::CollectionModel::Table
522 && contract.declared_model != crate::catalog::CollectionModel::Mixed
523 {
524 return Ok(());
525 }
526
527 let rules = resolved_uniqueness_rules(&contract);
528 if rules.is_empty() {
529 return Ok(());
530 }
531
532 for rule in &rules {
533 let mut seen = std::collections::HashMap::<String, usize>::new();
534 for (row_index, fields) in rows.iter().enumerate() {
535 let input_fields: std::collections::BTreeMap<String, Value> =
536 fields.iter().cloned().collect();
537 let mut signatures = Vec::new();
538 let mut skip_rule = false;
539
540 for column in &rule.columns {
541 match input_fields.get(column) {
542 Some(Value::Null) | None if rule.primary_key => {
543 return Err(crate::RedDBError::Query(format!(
544 "primary key '{}' in collection '{}' requires non-null column '{}'",
545 rule.name, collection, column
546 )))
547 }
548 Some(Value::Null) | None => {
549 skip_rule = true;
550 break;
551 }
552 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
553 }
554 }
555
556 if skip_rule {
557 continue;
558 }
559
560 let signature = signatures.join("|");
561 if let Some(previous_index) = seen.insert(signature, row_index) {
562 return Err(crate::RedDBError::Query(format!(
563 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
564 rule.name,
565 collection,
566 previous_index + 1,
567 row_index + 1
568 )));
569 }
570 }
571 }
572
573 Ok(())
574}
575
576fn row_update_requires_uniqueness_check(
577 db: &crate::storage::unified::devx::RedDB,
578 collection: &str,
579 modified_columns: &[String],
580) -> bool {
581 if modified_columns.is_empty() {
582 return false;
583 }
584
585 let Some(contract) = db.collection_contract(collection) else {
586 return false;
587 };
588 if contract.declared_model != crate::catalog::CollectionModel::Table
589 && contract.declared_model != crate::catalog::CollectionModel::Mixed
590 {
591 return false;
592 }
593
594 let rules = resolved_uniqueness_rules(&contract);
595 if rules.is_empty() {
596 return false;
597 }
598
599 rules.iter().any(|rule| {
600 rule.columns.iter().any(|column| {
601 modified_columns
602 .iter()
603 .any(|modified| modified.eq_ignore_ascii_case(column))
604 })
605 })
606}
607
608pub(crate) fn build_row_update_contract_plan(
609 db: &crate::storage::unified::devx::RedDB,
610 collection: &str,
611) -> RedDBResult<Option<RowUpdateContractPlan>> {
612 let Some(contract) = db.collection_contract(collection) else {
613 return Ok(None);
614 };
615
616 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
617 && !(contract.declared_columns.is_empty()
618 && contract
619 .table_def
620 .as_ref()
621 .map(|table| table.columns.is_empty())
622 .unwrap_or(true))
623 {
624 resolved_contract_columns(&contract)?
625 .into_iter()
626 .map(|rule| {
627 (
628 rule.name.clone(),
629 RowUpdateColumnRule {
630 name: rule.name,
631 data_type: rule.data_type,
632 data_type_name: rule.data_type_name,
633 not_null: rule.not_null,
634 enum_variants: rule.enum_variants,
635 },
636 )
637 })
638 .collect()
639 } else {
640 HashMap::new()
641 };
642
643 let unique_columns = if matches!(
644 contract.declared_model,
645 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
646 ) {
647 resolved_uniqueness_rules(&contract)
648 .into_iter()
649 .flat_map(|rule| rule.columns.into_iter())
650 .map(|column| (column, ()))
651 .collect()
652 } else {
653 HashMap::new()
654 };
655
656 Ok(Some(RowUpdateContractPlan {
657 timestamps_enabled: contract.timestamps_enabled,
658 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
659 declared_rules,
660 unique_columns,
661 }))
662}
663
664pub(crate) fn normalize_row_update_assignment_with_plan(
665 collection: &str,
666 column: &str,
667 value: Value,
668 row_contract_plan: Option<&RowUpdateContractPlan>,
669) -> RedDBResult<Value> {
670 let Some(plan) = row_contract_plan else {
671 return Ok(value);
672 };
673
674 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
675 return Err(crate::RedDBError::Query(format!(
676 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
677 collection, column
678 )));
679 }
680
681 if let Some(rule) = plan.declared_rules.get(column) {
682 let rule = ResolvedColumnRule {
683 name: rule.name.clone(),
684 data_type: rule.data_type,
685 data_type_name: rule.data_type_name.clone(),
686 not_null: rule.not_null,
687 default: None,
688 enum_variants: rule.enum_variants.clone(),
689 };
690 normalize_contract_value(collection, &rule, value)
691 } else if plan.strict_schema {
692 Err(crate::RedDBError::Query(format!(
693 "collection '{}' is strict and does not allow undeclared fields: {}",
694 collection, column
695 )))
696 } else {
697 Ok(value)
698 }
699}
700
701pub(crate) fn normalize_row_update_value_for_rule(
702 collection: &str,
703 value: Value,
704 row_rule: Option<&RowUpdateColumnRule>,
705) -> RedDBResult<Value> {
706 let Some(rule) = row_rule else {
707 return Ok(value);
708 };
709
710 let rule = ResolvedColumnRule {
711 name: rule.name.clone(),
712 data_type: rule.data_type,
713 data_type_name: rule.data_type_name.clone(),
714 not_null: rule.not_null,
715 default: None,
716 enum_variants: rule.enum_variants.clone(),
717 };
718 normalize_contract_value(collection, &rule, value)
719}
720
721fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
722 if let Some(named) = row.named.as_mut() {
723 named.insert(name.to_string(), value);
724 return;
725 }
726
727 if let Some(schema) = row.schema.as_ref() {
728 if let Some(index) = schema.iter().position(|column| column == name) {
729 if let Some(slot) = row.columns.get_mut(index) {
730 *slot = value;
731 return;
732 }
733 }
734
735 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
736 for (column, current) in schema.iter().zip(row.columns.iter()) {
737 named.insert(column.clone(), current.clone());
738 }
739 named.insert(name.to_string(), value);
740 row.named = Some(named);
741 return;
742 }
743
744 let mut named = HashMap::with_capacity(1);
745 named.insert(name.to_string(), value);
746 row.named = Some(named);
747}
748
749fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
750 if let Some(named) = row.named.as_ref() {
751 named
752 .iter()
753 .map(|(key, value)| (key.clone(), value.clone()))
754 .collect()
755 } else if let Some(schema) = row.schema.as_ref() {
756 schema
757 .iter()
758 .cloned()
759 .zip(row.columns.iter().cloned())
760 .collect()
761 } else {
762 Vec::new()
763 }
764}
765
766fn apply_row_field_assignments_raw<I>(
767 row: &mut crate::storage::unified::entity::RowData,
768 field_assignments: I,
769) where
770 I: IntoIterator<Item = (String, Value)>,
771{
772 for (column, value) in field_assignments {
773 set_row_field(row, &column, value);
774 }
775}
776
777fn apply_row_field_assignments_incremental<I>(
778 collection: &str,
779 row: &mut crate::storage::unified::entity::RowData,
780 field_assignments: I,
781 row_contract_plan: Option<&RowUpdateContractPlan>,
782) -> RedDBResult<()>
783where
784 I: IntoIterator<Item = (String, Value)>,
785{
786 for (column, value) in field_assignments {
787 let value = normalize_row_update_assignment_with_plan(
788 collection,
789 &column,
790 value,
791 row_contract_plan,
792 )?;
793
794 set_row_field(row, &column, value);
795 }
796
797 Ok(())
798}
799
800fn resolved_uniqueness_rules(
801 contract: &crate::physical::CollectionContract,
802) -> Vec<UniquenessRule> {
803 let mut rules = Vec::new();
804
805 if let Some(table_def) = &contract.table_def {
806 if !table_def.primary_key.is_empty() {
807 rules.push(UniquenessRule {
808 name: "primary_key".to_string(),
809 columns: table_def.primary_key.clone(),
810 primary_key: true,
811 });
812 }
813
814 for constraint in &table_def.constraints {
815 if matches!(
816 constraint.constraint_type,
817 crate::storage::schema::ConstraintType::PrimaryKey
818 ) && !constraint.columns.is_empty()
819 {
820 rules.push(UniquenessRule {
821 name: constraint.name.clone(),
822 columns: constraint.columns.clone(),
823 primary_key: true,
824 });
825 } else if matches!(
826 constraint.constraint_type,
827 crate::storage::schema::ConstraintType::Unique
828 ) && !constraint.columns.is_empty()
829 {
830 rules.push(UniquenessRule {
831 name: constraint.name.clone(),
832 columns: constraint.columns.clone(),
833 primary_key: false,
834 });
835 }
836 }
837 } else {
838 for column in &contract.declared_columns {
839 if column.primary_key {
840 rules.push(UniquenessRule {
841 name: format!("pk_{}", column.name),
842 columns: vec![column.name.clone()],
843 primary_key: true,
844 });
845 } else if column.unique {
846 rules.push(UniquenessRule {
847 name: format!("uniq_{}", column.name),
848 columns: vec![column.name.clone()],
849 primary_key: false,
850 });
851 }
852 }
853 }
854
855 let mut dedup = std::collections::BTreeSet::new();
856 rules
857 .into_iter()
858 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
859 .collect()
860}
861
862fn row_fields_from_entity(
863 entity: &crate::storage::UnifiedEntity,
864) -> Option<std::collections::BTreeMap<String, Value>> {
865 match &entity.data {
866 crate::storage::EntityData::Row(row) => {
867 if let Some(named) = &row.named {
868 Some(
869 named
870 .iter()
871 .map(|(key, value)| (key.clone(), value.clone()))
872 .collect(),
873 )
874 } else {
875 row.schema.as_ref().map(|schema| {
876 schema
877 .iter()
878 .cloned()
879 .zip(row.columns.iter().cloned())
880 .collect()
881 })
882 }
883 }
884 _ => None,
885 }
886}
887
888fn value_signature(value: &Value) -> String {
889 format!("{value:?}")
890}
891
892fn normalize_contract_value(
893 collection: &str,
894 column: &ResolvedColumnRule,
895 value: Value,
896) -> RedDBResult<Value> {
897 if matches!(value, Value::Null) {
898 if column.not_null {
899 return Err(crate::RedDBError::Query(format!(
900 "column '{}' in collection '{}' cannot be null",
901 column.name, collection
902 )));
903 }
904 return Ok(Value::Null);
905 }
906
907 let target = column.data_type;
908 if value_matches_declared_type(&value, target) {
909 return Ok(value);
910 }
911
912 let Some(raw) = value_to_coercion_input(&value) else {
913 return Err(crate::RedDBError::Query(format!(
914 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
915 column.name, collection, column.data_type_name
916 )));
917 };
918
919 coerce_contract_literal(collection, &column.name, column, &raw)
920}
921
922fn coerce_contract_literal(
923 collection: &str,
924 column_name: &str,
925 column: &ResolvedColumnRule,
926 raw: &str,
927) -> RedDBResult<Value> {
928 let target = column.data_type;
929 match target {
930 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
931 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
932 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
933 crate::RedDBError::Query(format!(
934 "failed to coerce column '{}' in collection '{}' to '{}': {}",
935 column_name, collection, column.data_type_name, err
936 ))
937 }),
938 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
939 crate::RedDBError::Query(format!(
940 "failed to coerce column '{}' in collection '{}' to '{}': {}",
941 column_name, collection, column.data_type_name, err
942 ))
943 }),
944 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
945 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
946 column_name, collection, column.data_type_name
947 ))),
948 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
949 |err| {
950 crate::RedDBError::Query(format!(
951 "failed to coerce column '{}' in collection '{}' to '{}': {}",
952 column_name, collection, column.data_type_name, err
953 ))
954 },
955 ),
956 }
957}
958
959struct ResolvedColumnRule {
960 name: String,
961 data_type: DataType,
962 data_type_name: String,
963 not_null: bool,
964 default: Option<String>,
965 enum_variants: Vec<String>,
966}
967
968fn resolved_contract_columns(
969 contract: &crate::physical::CollectionContract,
970) -> RedDBResult<Vec<ResolvedColumnRule>> {
971 if let Some(table_def) = &contract.table_def {
972 return Ok(table_def
973 .columns
974 .iter()
975 .map(|column| ResolvedColumnRule {
976 name: column.name.clone(),
977 data_type: column.data_type,
978 data_type_name: data_type_name(column.data_type).to_string(),
979 not_null: !column.nullable,
980 default: column
981 .default
982 .as_ref()
983 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
984 enum_variants: column.enum_variants.clone(),
985 })
986 .collect());
987 }
988
989 contract
990 .declared_columns
991 .iter()
992 .map(|column| {
993 let data_type = column
994 .sql_type
995 .as_ref()
996 .map(crate::storage::query::resolve_sql_type_name)
997 .transpose()
998 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
999 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1000 Ok(ResolvedColumnRule {
1001 name: column.name.clone(),
1002 data_type,
1003 data_type_name: column.data_type.clone(),
1004 not_null: column.not_null,
1005 default: column.default.clone(),
1006 enum_variants: column.enum_variants.clone(),
1007 })
1008 })
1009 .collect()
1010}
1011
1012fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1013 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1014}
1015
1016fn data_type_name(data_type: DataType) -> &'static str {
1017 match data_type {
1018 DataType::Integer => "integer",
1019 DataType::UnsignedInteger => "unsigned_integer",
1020 DataType::Float => "float",
1021 DataType::Text => "text",
1022 DataType::Blob => "blob",
1023 DataType::Boolean => "boolean",
1024 DataType::Timestamp => "timestamp",
1025 DataType::Duration => "duration",
1026 DataType::IpAddr => "ipaddr",
1027 DataType::MacAddr => "macaddr",
1028 DataType::Vector => "vector",
1029 DataType::Nullable => "nullable",
1030 DataType::Unknown => "unknown",
1031 DataType::Json => "json",
1032 DataType::Uuid => "uuid",
1033 DataType::NodeRef => "noderef",
1034 DataType::EdgeRef => "edgeref",
1035 DataType::VectorRef => "vectorref",
1036 DataType::RowRef => "rowref",
1037 DataType::Color => "color",
1038 DataType::Email => "email",
1039 DataType::Url => "url",
1040 DataType::Phone => "phone",
1041 DataType::Semver => "semver",
1042 DataType::Cidr => "cidr",
1043 DataType::Date => "date",
1044 DataType::Time => "time",
1045 DataType::Decimal => "decimal",
1046 DataType::Enum => "enum",
1047 DataType::Array => "array",
1048 DataType::TimestampMs => "timestamp_ms",
1049 DataType::Ipv4 => "ipv4",
1050 DataType::Ipv6 => "ipv6",
1051 DataType::Subnet => "subnet",
1052 DataType::Port => "port",
1053 DataType::Latitude => "latitude",
1054 DataType::Longitude => "longitude",
1055 DataType::GeoPoint => "geopoint",
1056 DataType::Country2 => "country2",
1057 DataType::Country3 => "country3",
1058 DataType::Lang2 => "lang2",
1059 DataType::Lang5 => "lang5",
1060 DataType::Currency => "currency",
1061 DataType::AssetCode => "asset_code",
1062 DataType::Money => "money",
1063 DataType::ColorAlpha => "color_alpha",
1064 DataType::BigInt => "bigint",
1065 DataType::KeyRef => "keyref",
1066 DataType::DocRef => "docref",
1067 DataType::TableRef => "tableref",
1068 DataType::PageRef => "pageref",
1069 DataType::Secret => "secret",
1070 DataType::Password => "password",
1071 DataType::TextZstd => "text",
1072 DataType::BlobZstd => "blob",
1073 }
1074}
1075
1076fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1077 matches!(
1078 (value, target),
1079 (Value::Null, _)
1080 | (Value::Integer(_), DataType::Integer)
1081 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1082 | (Value::Float(_), DataType::Float)
1083 | (Value::Text(_), DataType::Text)
1084 | (Value::Blob(_), DataType::Blob)
1085 | (Value::Boolean(_), DataType::Boolean)
1086 | (Value::Timestamp(_), DataType::Timestamp)
1087 | (Value::Duration(_), DataType::Duration)
1088 | (Value::IpAddr(_), DataType::IpAddr)
1089 | (Value::MacAddr(_), DataType::MacAddr)
1090 | (Value::Vector(_), DataType::Vector)
1091 | (Value::Json(_), DataType::Json)
1092 | (Value::Uuid(_), DataType::Uuid)
1093 | (Value::NodeRef(_), DataType::NodeRef)
1094 | (Value::EdgeRef(_), DataType::EdgeRef)
1095 | (Value::VectorRef(_, _), DataType::VectorRef)
1096 | (Value::RowRef(_, _), DataType::RowRef)
1097 | (Value::Color(_), DataType::Color)
1098 | (Value::Email(_), DataType::Email)
1099 | (Value::Url(_), DataType::Url)
1100 | (Value::Phone(_), DataType::Phone)
1101 | (Value::Semver(_), DataType::Semver)
1102 | (Value::Cidr(_, _), DataType::Cidr)
1103 | (Value::Date(_), DataType::Date)
1104 | (Value::Time(_), DataType::Time)
1105 | (Value::Decimal(_), DataType::Decimal)
1106 | (Value::EnumValue(_), DataType::Enum)
1107 | (Value::Array(_), DataType::Array)
1108 | (Value::TimestampMs(_), DataType::TimestampMs)
1109 | (Value::Ipv4(_), DataType::Ipv4)
1110 | (Value::Ipv6(_), DataType::Ipv6)
1111 | (Value::Subnet(_, _), DataType::Subnet)
1112 | (Value::Port(_), DataType::Port)
1113 | (Value::Latitude(_), DataType::Latitude)
1114 | (Value::Longitude(_), DataType::Longitude)
1115 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1116 | (Value::Country2(_), DataType::Country2)
1117 | (Value::Country3(_), DataType::Country3)
1118 | (Value::Lang2(_), DataType::Lang2)
1119 | (Value::Lang5(_), DataType::Lang5)
1120 | (Value::Currency(_), DataType::Currency)
1121 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1122 | (Value::BigInt(_), DataType::BigInt)
1123 | (Value::KeyRef(_, _), DataType::KeyRef)
1124 | (Value::DocRef(_, _), DataType::DocRef)
1125 | (Value::TableRef(_), DataType::TableRef)
1126 | (Value::PageRef(_), DataType::PageRef)
1127 | (Value::Secret(_), DataType::Secret)
1128 | (Value::Password(_), DataType::Password)
1129 )
1130}
1131
1132fn value_to_coercion_input(value: &Value) -> Option<String> {
1133 match value {
1134 Value::Null => None,
1135 Value::Integer(value) => Some(value.to_string()),
1136 Value::UnsignedInteger(value) => Some(value.to_string()),
1137 Value::Float(value) => Some(value.to_string()),
1138 Value::Text(value) => Some(value.to_string()),
1139 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1140 Value::Boolean(value) => Some(value.to_string()),
1141 Value::Timestamp(value) => Some(value.to_string()),
1142 Value::Duration(value) => Some(value.to_string()),
1143 Value::IpAddr(value) => Some(value.to_string()),
1144 Value::MacAddr(value) => Some(format!(
1145 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1146 value[0], value[1], value[2], value[3], value[4], value[5]
1147 )),
1148 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1149 Value::Email(value) => Some(value.clone()),
1150 Value::Url(value) => Some(value.clone()),
1151 Value::Phone(value) => Some(value.to_string()),
1152 Value::Semver(value) => Some(format!(
1153 "{}.{}.{}",
1154 value / 1_000_000,
1155 (value / 1_000) % 1_000,
1156 value % 1_000
1157 )),
1158 Value::Date(value) => Some(value.to_string()),
1159 Value::Time(value) => Some(value.to_string()),
1160 Value::Decimal(value) => Some(value.to_string()),
1161 Value::TimestampMs(value) => Some(value.to_string()),
1162 Value::Ipv4(value) => Some(format!(
1163 "{}.{}.{}.{}",
1164 (value >> 24) & 0xFF,
1165 (value >> 16) & 0xFF,
1166 (value >> 8) & 0xFF,
1167 value & 0xFF
1168 )),
1169 Value::Port(value) => Some(value.to_string()),
1170 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1171 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1172 Value::GeoPoint(lat, lon) => Some(format!(
1173 "{},{}",
1174 *lat as f64 / 1_000_000.0,
1175 *lon as f64 / 1_000_000.0
1176 )),
1177 Value::BigInt(value) => Some(value.to_string()),
1178 Value::TableRef(value) => Some(value.clone()),
1179 Value::PageRef(value) => Some(value.to_string()),
1180 Value::Password(value) => Some(value.clone()),
1181 _ => None,
1182 }
1183}
1184
1185fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1186 if modified_columns.is_empty() {
1187 return modified_columns;
1188 }
1189
1190 let mut unique = Vec::with_capacity(modified_columns.len());
1191 for column in modified_columns.drain(..) {
1192 if !unique
1193 .iter()
1194 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1195 {
1196 unique.push(column);
1197 }
1198 }
1199 unique
1200}
1201
1202fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1203 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1204 return Err(crate::RedDBError::Query(
1205 "array positional document patch paths are unsupported; replace the array or full document body instead"
1206 .to_string(),
1207 ));
1208 }
1209 Ok(())
1210}
1211
1212fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1213 match fields.get("body") {
1214 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1215 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1216 }),
1217 Some(_) => Err(crate::RedDBError::Query(
1218 "document body field must contain JSON".to_string(),
1219 )),
1220 None => Ok(JsonValue::Object(Default::default())),
1221 }
1222}
1223
1224fn replace_document_row_body(
1225 fields: &mut HashMap<String, Value>,
1226 body: JsonValue,
1227 modified_columns: &mut Vec<String>,
1228) -> RedDBResult<()> {
1229 modified_columns.push("body".to_string());
1230
1231 let old_keys: Vec<String> = fields
1232 .keys()
1233 .filter(|key| key.as_str() != "body")
1234 .cloned()
1235 .collect();
1236 for key in old_keys {
1237 fields.remove(&key);
1238 modified_columns.push(key);
1239 }
1240
1241 let body_bytes = json_to_vec(&body).map_err(|err| {
1242 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1243 })?;
1244 fields.insert("body".to_string(), Value::Json(body_bytes));
1245
1246 if let JsonValue::Object(map) = &body {
1247 for (key, value) in map {
1248 fields.insert(key.clone(), json_to_storage_value(value)?);
1249 modified_columns.push(key.clone());
1250 }
1251 }
1252
1253 Ok(())
1254}
1255
1256impl RedDBRuntime {
1257 pub(crate) fn apply_loaded_patch_entity_core(
1258 &self,
1259 collection: String,
1260 mut entity: crate::storage::UnifiedEntity,
1261 payload: JsonValue,
1262 operations: Vec<PatchEntityOperation>,
1263 ) -> RedDBResult<AppliedEntityMutation> {
1264 let id = entity.id;
1265 let operations = normalize_ttl_patch_operations(operations)?;
1266 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1269
1270 let db = self.db();
1271 let store = db.store();
1272 let Some(manager) = store.get_collection(&collection) else {
1273 return Err(crate::RedDBError::NotFound(format!(
1274 "collection not found: {collection}"
1275 )));
1276 };
1277
1278 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1279 let mut metadata_changed = false;
1280 let mut modified_columns: Vec<String> = Vec::new();
1281 let mut context_index_dirty = false;
1282 let mut graph_node_type: Option<String> = None;
1283 let mut graph_edge_weight: Option<f32> = None;
1284
1285 let row_contract_timestamps = db
1286 .collection_contract(&collection)
1287 .map(|c| c.timestamps_enabled)
1288 .unwrap_or(false);
1289
1290 match &mut entity.data {
1291 crate::storage::EntityData::Row(row) => {
1292 let is_document_collection = db
1293 .collection_contract(&collection)
1294 .map(|contract| {
1295 contract.declared_model == crate::catalog::CollectionModel::Document
1296 })
1297 .unwrap_or(false);
1298 let mut field_ops = Vec::new();
1299 let mut metadata_ops = Vec::new();
1300 let mut document_body_ops = Vec::new();
1301
1302 for mut op in operations {
1303 let Some(root) = op.path.first().map(String::as_str) else {
1304 return Err(crate::RedDBError::Query(
1305 "patch path cannot be empty".to_string(),
1306 ));
1307 };
1308
1309 match root {
1310 "body" if is_document_collection => {
1311 if op.path.len() < 2 {
1312 return Err(crate::RedDBError::Query(
1313 "document body patch paths require a nested key; use payload.body for full replacement"
1314 .to_string(),
1315 ));
1316 }
1317 op.path.remove(0);
1318 reject_document_array_position_path(&op.path)?;
1319 document_body_ops.push(op);
1320 }
1321 "fields" | "named" => {
1322 if op.path.len() < 2 {
1323 return Err(crate::RedDBError::Query(
1324 "patch path 'fields' requires a nested key".to_string(),
1325 ));
1326 }
1327 if row_contract_timestamps {
1328 let leaf = op.path.get(1).map(String::as_str);
1329 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1330 return Err(crate::RedDBError::Query(format!(
1331 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1332 collection,
1333 leaf.unwrap_or("")
1334 )));
1335 }
1336 }
1337 op.path.remove(0);
1338 field_ops.push(op);
1339 }
1340 "metadata" => {
1341 if op.path.len() < 2 {
1342 return Err(crate::RedDBError::Query(
1343 "patch path 'metadata' requires a nested key".to_string(),
1344 ));
1345 }
1346 op.path.remove(0);
1347 metadata_ops.push(op);
1348 }
1349 _ => {
1350 return Err(crate::RedDBError::Query(format!(
1351 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1352 )));
1353 }
1354 }
1355 }
1356
1357 if !document_body_ops.is_empty() {
1358 context_index_dirty = true;
1359 let named = row.named.get_or_insert_with(Default::default);
1360 let mut body = document_body_from_named(named)?;
1361 apply_patch_operations_to_json(&mut body, &document_body_ops)
1362 .map_err(crate::RedDBError::Query)?;
1363 replace_document_row_body(named, body, &mut modified_columns)?;
1364 }
1365
1366 if is_document_collection {
1367 if let Some(body) = payload.get("body") {
1368 context_index_dirty = true;
1369 let named = row.named.get_or_insert_with(Default::default);
1370 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1371 }
1372 }
1373
1374 if !field_ops.is_empty() {
1375 context_index_dirty = true;
1376 for op in &field_ops {
1377 if let Some(col) = op.path.first() {
1378 modified_columns.push(col.clone());
1379 }
1380 }
1381 let named = row.named.get_or_insert_with(Default::default);
1382 apply_patch_operations_to_storage_map(named, &field_ops)?;
1383 }
1384
1385 if let Some(fields) = payload
1386 .get("fields")
1387 .and_then(crate::json::Value::as_object)
1388 {
1389 if row_contract_timestamps {
1390 for key in fields.keys() {
1391 if key == "created_at" || key == "updated_at" {
1392 return Err(crate::RedDBError::Query(format!(
1393 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1394 collection, key
1395 )));
1396 }
1397 }
1398 }
1399 context_index_dirty = true;
1400 let named = row.named.get_or_insert_with(Default::default);
1401 for (key, value) in fields {
1402 modified_columns.push(key.clone());
1403 named.insert(key.clone(), json_to_storage_value(value)?);
1404 }
1405 }
1406
1407 if !metadata_ops.is_empty() {
1408 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1409 let metadata = patch_metadata.get_or_insert_with(|| {
1410 store.get_metadata(&collection, id).unwrap_or_default()
1411 });
1412 let mut metadata_json = metadata_to_json(metadata);
1413 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1414 .map_err(crate::RedDBError::Query)?;
1415 *metadata = metadata_from_json(&metadata_json)?;
1416 metadata_changed = true;
1417 }
1418
1419 if !modified_columns.is_empty() || row_contract_timestamps {
1420 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1421 let current_fields = if let Some(named) = row.named.take() {
1422 named.into_iter().collect::<Vec<_>>()
1423 } else if let Some(schema) = row.schema.as_ref() {
1424 schema
1425 .iter()
1426 .cloned()
1427 .zip(row.columns.iter().cloned())
1428 .collect::<Vec<_>>()
1429 } else {
1430 Vec::new()
1431 };
1432 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1433 if row_contract_timestamps {
1434 modified_columns.push("updated_at".to_string());
1435 context_index_dirty = true;
1436 }
1437 if contract.requires_uniqueness_check(&modified_columns) {
1438 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1439 }
1440 row.named = Some(normalized_fields.into_iter().collect());
1441 }
1442 }
1443 crate::storage::EntityData::Node(node) => {
1444 let mut field_ops = Vec::new();
1445 let mut metadata_ops = Vec::new();
1446 let mut node_type_ops = Vec::new();
1447
1448 for mut op in operations {
1449 let Some(root) = op.path.first().map(String::as_str) else {
1450 return Err(crate::RedDBError::Query(
1451 "patch path cannot be empty".to_string(),
1452 ));
1453 };
1454
1455 match root {
1456 "fields" | "properties" => {
1457 if op.path.len() < 2 {
1458 return Err(crate::RedDBError::Query(
1459 "patch path 'fields' requires a nested key".to_string(),
1460 ));
1461 }
1462 op.path.remove(0);
1463 field_ops.push(op);
1464 }
1465 "metadata" => {
1466 if op.path.len() < 2 {
1467 return Err(crate::RedDBError::Query(
1468 "patch path 'metadata' requires a nested key".to_string(),
1469 ));
1470 }
1471 op.path.remove(0);
1472 metadata_ops.push(op);
1473 }
1474 "node_type" => {
1475 if op.path.len() != 1 {
1476 return Err(crate::RedDBError::Query(
1477 "patch path 'node_type' does not allow nested keys".to_string(),
1478 ));
1479 }
1480 op.path.clear();
1481 node_type_ops.push(op);
1482 }
1483 _ => {
1484 return Err(crate::RedDBError::Query(format!(
1485 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1486 )));
1487 }
1488 }
1489 }
1490
1491 for op in node_type_ops {
1492 context_index_dirty = true;
1493 let value = op.value.ok_or_else(|| {
1494 crate::RedDBError::Query("node_type operations require a value".to_string())
1495 })?;
1496
1497 match op.op {
1498 PatchEntityOperationType::Unset => {
1499 return Err(crate::RedDBError::Query(
1500 "node_type cannot be unset through patch operations".to_string(),
1501 ));
1502 }
1503 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1504 let Some(node_type) = value.as_str() else {
1505 return Err(crate::RedDBError::Query(
1506 "node_type operation requires a text value".to_string(),
1507 ));
1508 };
1509 graph_node_type = Some(node_type.to_string());
1510 modified_columns.push("node_type".to_string());
1511 }
1512 }
1513 }
1514
1515 if !field_ops.is_empty() {
1516 context_index_dirty = true;
1517 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1518 }
1519
1520 if let Some(fields) = payload
1521 .get("fields")
1522 .and_then(crate::json::Value::as_object)
1523 {
1524 context_index_dirty = true;
1525 for (key, value) in fields {
1526 node.properties
1527 .insert(key.clone(), json_to_storage_value(value)?);
1528 }
1529 }
1530
1531 if !metadata_ops.is_empty() {
1532 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1533 let metadata = patch_metadata.get_or_insert_with(|| {
1534 store.get_metadata(&collection, id).unwrap_or_default()
1535 });
1536 let mut metadata_json = metadata_to_json(metadata);
1537 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1538 .map_err(crate::RedDBError::Query)?;
1539 *metadata = metadata_from_json(&metadata_json)?;
1540 metadata_changed = true;
1541 }
1542 }
1543 crate::storage::EntityData::Edge(edge) => {
1544 let mut field_ops = Vec::new();
1545 let mut metadata_ops = Vec::new();
1546 let mut weight_ops = Vec::new();
1547
1548 for mut op in operations {
1549 let Some(root) = op.path.first().map(String::as_str) else {
1550 return Err(crate::RedDBError::Query(
1551 "patch path cannot be empty".to_string(),
1552 ));
1553 };
1554
1555 match root {
1556 "fields" | "properties" => {
1557 if op.path.len() < 2 {
1558 return Err(crate::RedDBError::Query(
1559 "patch path 'fields' requires a nested key".to_string(),
1560 ));
1561 }
1562 op.path.remove(0);
1563 field_ops.push(op);
1564 }
1565 "weight" => {
1566 if op.path.len() != 1 {
1567 return Err(crate::RedDBError::Query(
1568 "patch path 'weight' does not allow nested keys".to_string(),
1569 ));
1570 }
1571 op.path.clear();
1572 weight_ops.push(op);
1573 }
1574 "metadata" => {
1575 if op.path.len() < 2 {
1576 return Err(crate::RedDBError::Query(
1577 "patch path 'metadata' requires a nested key".to_string(),
1578 ));
1579 }
1580 op.path.remove(0);
1581 metadata_ops.push(op);
1582 }
1583 _ => {
1584 return Err(crate::RedDBError::Query(format!(
1585 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1586 )));
1587 }
1588 }
1589 }
1590
1591 if !field_ops.is_empty() {
1592 context_index_dirty = true;
1593 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1594 }
1595
1596 for op in weight_ops {
1597 context_index_dirty = true;
1598 let value = op.value.ok_or_else(|| {
1599 crate::RedDBError::Query("weight operations require a value".to_string())
1600 })?;
1601
1602 match op.op {
1603 PatchEntityOperationType::Unset => {
1604 return Err(crate::RedDBError::Query(
1605 "weight cannot be unset through patch operations".to_string(),
1606 ));
1607 }
1608 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1609 let Some(weight) = value.as_f64() else {
1610 return Err(crate::RedDBError::Query(
1611 "weight operation requires a numeric value".to_string(),
1612 ));
1613 };
1614 graph_edge_weight = Some(weight as f32);
1615 edge.weight = weight as f32;
1616 modified_columns.push("weight".to_string());
1617 }
1618 }
1619 }
1620
1621 if let Some(fields) = payload
1622 .get("fields")
1623 .and_then(crate::json::Value::as_object)
1624 {
1625 context_index_dirty = true;
1626 for (key, value) in fields {
1627 edge.properties
1628 .insert(key.clone(), json_to_storage_value(value)?);
1629 }
1630 }
1631
1632 if !metadata_ops.is_empty() {
1633 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1634 let metadata = patch_metadata.get_or_insert_with(|| {
1635 store.get_metadata(&collection, id).unwrap_or_default()
1636 });
1637 let mut metadata_json = metadata_to_json(metadata);
1638 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1639 .map_err(crate::RedDBError::Query)?;
1640 *metadata = metadata_from_json(&metadata_json)?;
1641 metadata_changed = true;
1642 }
1643 }
1644 crate::storage::EntityData::Vector(vector) => {
1645 let mut field_ops = Vec::new();
1646 let mut metadata_ops = Vec::new();
1647
1648 for mut op in operations {
1649 let Some(root) = op.path.first().map(String::as_str) else {
1650 return Err(crate::RedDBError::Query(
1651 "patch path cannot be empty".to_string(),
1652 ));
1653 };
1654
1655 match root {
1656 "fields" => {
1657 if op.path.len() < 2 {
1658 return Err(crate::RedDBError::Query(
1659 "patch path 'fields' requires a nested key".to_string(),
1660 ));
1661 }
1662 op.path.remove(0);
1663 let Some(target) = op.path.first().map(String::as_str) else {
1664 return Err(crate::RedDBError::Query(
1665 "patch path requires a target under fields".to_string(),
1666 ));
1667 };
1668 if !matches!(target, "dense" | "content" | "sparse") {
1669 return Err(crate::RedDBError::Query(format!(
1670 "unsupported vector patch target '{target}'"
1671 )));
1672 }
1673 field_ops.push(op);
1674 }
1675 "metadata" => {
1676 if op.path.len() < 2 {
1677 return Err(crate::RedDBError::Query(
1678 "patch path 'metadata' requires a nested key".to_string(),
1679 ));
1680 }
1681 op.path.remove(0);
1682 metadata_ops.push(op);
1683 }
1684 _ => {
1685 return Err(crate::RedDBError::Query(format!(
1686 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1687 )));
1688 }
1689 }
1690 }
1691
1692 if !field_ops.is_empty() {
1693 context_index_dirty = true;
1694 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1695 }
1696
1697 if let Some(fields) = payload
1698 .get("fields")
1699 .and_then(crate::json::Value::as_object)
1700 {
1701 context_index_dirty = true;
1702 if let Some(content) =
1703 fields.get("content").and_then(crate::json::Value::as_str)
1704 {
1705 vector.content = Some(content.to_string());
1706 }
1707 if let Some(dense) = fields.get("dense") {
1708 vector.dense = dense
1709 .as_array()
1710 .ok_or_else(|| {
1711 crate::RedDBError::Query(
1712 "field 'dense' must be an array".to_string(),
1713 )
1714 })?
1715 .iter()
1716 .map(|value| {
1717 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1718 crate::RedDBError::Query(
1719 "field 'dense' must contain only numbers".to_string(),
1720 )
1721 })
1722 })
1723 .collect::<Result<Vec<_>, _>>()?;
1724 }
1725 }
1726
1727 if !metadata_ops.is_empty() {
1728 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1729 let metadata = patch_metadata.get_or_insert_with(|| {
1730 store.get_metadata(&collection, id).unwrap_or_default()
1731 });
1732 let mut metadata_json = metadata_to_json(metadata);
1733 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1734 .map_err(crate::RedDBError::Query)?;
1735 *metadata = metadata_from_json(&metadata_json)?;
1736 metadata_changed = true;
1737 }
1738 }
1739 crate::storage::EntityData::TimeSeries(_)
1740 | crate::storage::EntityData::QueueMessage(_) => {
1741 return Err(crate::RedDBError::Query(
1742 "patch operations are not supported for TimeSeries or QueueMessage entities"
1743 .to_string(),
1744 ));
1745 }
1746 }
1747
1748 if let Some(node_type) = graph_node_type {
1749 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1750 node.node_type = node_type;
1751 }
1752 }
1753 if let Some(weight) = graph_edge_weight {
1754 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1755 edge.weight = (weight * 1000.0) as u32;
1756 }
1757 }
1758
1759 if let Some(metadata) = payload
1760 .get("metadata")
1761 .and_then(crate::json::Value::as_object)
1762 {
1763 let patch_metadata = patch_metadata
1764 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1765 for (key, value) in metadata {
1766 ensure_non_tree_reserved_metadata_key(key)?;
1767 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1768 }
1769 metadata_changed = true;
1770 }
1771
1772 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1773 let patch_metadata = patch_metadata
1774 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1775 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1776 patch_metadata.remove(&key);
1777 } else {
1778 patch_metadata.set(key, value);
1779 }
1780 metadata_changed = true;
1781 }
1782
1783 entity.updated_at = std::time::SystemTime::now()
1784 .duration_since(std::time::UNIX_EPOCH)
1785 .unwrap_or_default()
1786 .as_secs();
1787
1788 modified_columns = dedupe_modified_columns(modified_columns);
1789
1790 Ok(AppliedEntityMutation {
1791 id,
1792 collection,
1793 entity,
1794 metadata: patch_metadata,
1795 modified_columns,
1796 persist_metadata: metadata_changed,
1797 context_index_dirty,
1798 replaced_entity: None,
1799 replaced_entity_previous_xmax: 0,
1800 pre_mutation_fields,
1801 })
1802 }
1803
1804 pub(crate) fn apply_loaded_sql_update_row_core(
1805 &self,
1806 collection: String,
1807 mut entity: crate::storage::UnifiedEntity,
1808 static_field_assignments: &[(String, Value)],
1809 dynamic_field_assignments: Vec<(String, Value)>,
1810 static_metadata_assignments: &[(String, MetadataValue)],
1811 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1812 row_contract_plan: Option<&RowUpdateContractPlan>,
1813 row_modified_columns_template: &[String],
1814 row_touches_unique_columns: bool,
1815 ) -> RedDBResult<AppliedEntityMutation> {
1816 let id = entity.id;
1817 let previous_xmax = entity.xmax;
1818 let db = self.db();
1819 let store = db.store();
1820 let Some(_) = store.get_collection(&collection) else {
1821 return Err(crate::RedDBError::NotFound(format!(
1822 "collection not found: {collection}"
1823 )));
1824 };
1825
1826 let versioned_update_xid = match self.current_xid() {
1827 Some(xid) => Some(xid),
1828 None => {
1829 let snapshot_manager = self.snapshot_manager();
1830 let xid = snapshot_manager.begin();
1831 snapshot_manager.commit(xid);
1832 Some(xid)
1833 }
1834 };
1835 let mut replaced_entity = versioned_update_xid.map(|xid| {
1836 let mut old = entity.clone();
1837 old.set_xmax(xid);
1838 old
1839 });
1840
1841 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1842 let row_contract_timestamps = row_contract_plan
1843 .map(|plan| plan.timestamps_enabled)
1844 .unwrap_or(false);
1845 let mut metadata_changed = false;
1846 let mut modified_columns = row_modified_columns_template.to_vec();
1847 let mut context_index_dirty = !modified_columns.is_empty();
1848
1849 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1853
1854 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1855 return Err(crate::RedDBError::Query(
1856 "SQL row update fast path requires a row entity".to_string(),
1857 ));
1858 };
1859
1860 let _ = row_contract_plan;
1861 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1862 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1863
1864 for (key, value) in static_metadata_assignments
1865 .iter()
1866 .cloned()
1867 .chain(dynamic_metadata_assignments)
1868 {
1869 ensure_non_tree_reserved_metadata_key(&key)?;
1870 patch_metadata
1871 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1872 .set(key, value);
1873 metadata_changed = true;
1874 }
1875
1876 if !modified_columns.is_empty() || row_contract_timestamps {
1877 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1878 if row_contract_timestamps {
1879 context_index_dirty = true;
1880 set_row_field(
1881 row,
1882 "updated_at",
1883 Value::UnsignedInteger(current_unix_ms_u64()),
1884 );
1885 modified_columns.push("updated_at".to_string());
1886 }
1887 if row_touches_unique_columns {
1888 let current_fields = collect_row_fields(row);
1889 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1890 }
1891 }
1892
1893 entity.updated_at = std::time::SystemTime::now()
1894 .duration_since(std::time::UNIX_EPOCH)
1895 .unwrap_or_default()
1896 .as_secs();
1897
1898 if let Some(xid) = versioned_update_xid {
1899 let logical_id = entity.logical_id();
1900 entity.id = store.next_entity_id();
1901 entity.set_logical_id(logical_id);
1902 entity.set_xmin(xid);
1903 entity.set_xmax(0);
1904 if let Some(old) = replaced_entity.as_mut() {
1905 old.set_xmax(xid);
1906 }
1907 }
1908
1909 modified_columns = dedupe_modified_columns(modified_columns);
1910
1911 Ok(AppliedEntityMutation {
1912 id: entity.id,
1913 collection,
1914 entity,
1915 metadata: patch_metadata,
1916 modified_columns,
1917 persist_metadata: metadata_changed,
1918 context_index_dirty,
1919 replaced_entity,
1920 replaced_entity_previous_xmax: previous_xmax,
1921 pre_mutation_fields,
1922 })
1923 }
1924
1925 pub(crate) fn persist_applied_entity_mutations(
1926 &self,
1927 applied: &[AppliedEntityMutation],
1928 ) -> RedDBResult<()> {
1929 if applied.is_empty() {
1930 return Ok(());
1931 }
1932
1933 let store = self.db().store();
1934 let collection = &applied[0].collection;
1935 let Some(manager) = store.get_collection(collection) else {
1936 return Err(crate::RedDBError::NotFound(format!(
1937 "collection not found: {collection}"
1938 )));
1939 };
1940
1941 let mut ordinary = Vec::with_capacity(applied.len());
1942 for item in applied {
1943 if let Some(old_version) = item.replaced_entity.as_ref() {
1944 store
1945 .install_versioned_table_row_update(
1946 collection,
1947 old_version.clone(),
1948 item.entity.clone(),
1949 if item.persist_metadata {
1950 item.metadata.as_ref()
1951 } else {
1952 None
1953 },
1954 )
1955 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1956 if self.current_xid().is_some() {
1957 self.record_pending_versioned_update(
1958 crate::runtime::impl_core::current_connection_id(),
1959 collection,
1960 old_version.id,
1961 item.entity.id,
1962 old_version.xmax,
1963 item.replaced_entity_previous_xmax,
1964 );
1965 }
1966 } else {
1967 ordinary.push(item);
1968 }
1969 }
1970 if ordinary.is_empty() {
1971 return Ok(());
1972 }
1973
1974 manager
1975 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1976 (
1977 &item.entity,
1978 item.modified_columns.as_slice(),
1979 if item.persist_metadata {
1980 item.metadata.as_ref()
1981 } else {
1982 None
1983 },
1984 )
1985 }))
1986 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1987
1988 let indexed_cols = self
1997 .index_store_ref()
1998 .indexed_columns_set(collection.as_str());
1999 let all_hot = !indexed_cols.is_empty()
2000 && ordinary.iter().all(|item| {
2001 !item.persist_metadata
2002 && !item
2003 .modified_columns
2004 .iter()
2005 .any(|c| indexed_cols.contains(c))
2006 })
2007 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2008
2009 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2013 ordinary.iter().map(|item| &item.entity).collect();
2014 let persist_fn = if all_hot {
2015 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2016 } else {
2017 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2018 };
2019 persist_fn(store.as_ref(), collection, &entity_refs)
2020 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2021 }
2022
2023 pub(crate) fn flush_applied_entity_mutation(
2024 &self,
2025 applied: &AppliedEntityMutation,
2026 ) -> RedDBResult<()> {
2027 let store = self.db().store();
2028 if applied.context_index_dirty {
2029 store
2030 .context_index()
2031 .index_entity(&applied.collection, &applied.entity);
2032 }
2033 let mut changed_columns: Option<Vec<String>> = None;
2041 if !applied.pre_mutation_fields.is_empty() {
2042 let post = entity_row_fields_snapshot(&applied.entity);
2043 if !post.is_empty() {
2044 let damage = crate::application::entity::row_damage_vector(
2045 &applied.pre_mutation_fields,
2046 &post,
2047 );
2048 if !damage.is_empty() {
2049 changed_columns = Some(
2050 damage
2051 .touched_columns()
2052 .into_iter()
2053 .map(str::to_string)
2054 .collect(),
2055 );
2056 }
2057
2058 let indexed_cols: std::collections::HashSet<String> = self
2067 .index_store_ref()
2068 .list_indices(applied.collection.as_str())
2069 .into_iter()
2070 .filter_map(|idx| idx.columns.first().cloned())
2071 .collect();
2072 let modified_cols: std::collections::HashSet<String> = damage
2073 .touched_columns()
2074 .into_iter()
2075 .map(str::to_string)
2076 .collect();
2077 if let Some(old_version) = applied.replaced_entity.as_ref() {
2078 let old_index_fields: Vec<(String, Value)> = applied
2079 .pre_mutation_fields
2080 .iter()
2081 .filter(|(col, _)| indexed_cols.contains(col))
2082 .cloned()
2083 .collect();
2084 let new_index_fields: Vec<(String, Value)> = post
2085 .iter()
2086 .filter(|(col, _)| indexed_cols.contains(col))
2087 .cloned()
2088 .collect();
2089 if !old_index_fields.is_empty() {
2090 self.index_store_ref()
2091 .index_entity_delete(
2092 &applied.collection,
2093 old_version.id,
2094 &old_index_fields,
2095 )
2096 .map_err(crate::RedDBError::Internal)?;
2097 }
2098 if !new_index_fields.is_empty() {
2099 self.index_store_ref()
2100 .index_entity_insert(
2101 &applied.collection,
2102 applied.entity.id,
2103 &new_index_fields,
2104 )
2105 .map_err(crate::RedDBError::Internal)?;
2106 }
2107 } else {
2108 let decision = crate::storage::engine::hot_update::decide(
2109 &crate::storage::engine::hot_update::HotUpdateInputs {
2110 collection: applied.collection.as_str(),
2111 indexed_columns: &indexed_cols,
2112 modified_columns: &modified_cols,
2113 new_tuple_size: 0,
2117 page_free_space: usize::MAX,
2118 },
2119 );
2120 if !decision.can_hot {
2121 self.index_store_ref()
2122 .index_entity_update(
2123 &applied.collection,
2124 applied.id,
2125 &applied.pre_mutation_fields,
2126 &post,
2127 )
2128 .map_err(crate::RedDBError::Internal)?;
2129 } else {
2130 tracing::debug!(
2134 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2135 "hot_update fast-path: skipped index_entity_update"
2136 );
2137 }
2138 }
2139 }
2140 }
2141 self.cdc_emit_prebuilt_with_columns(
2142 crate::replication::cdc::ChangeOperation::Update,
2143 &applied.collection,
2144 &applied.entity,
2145 "entity",
2146 applied.metadata.as_ref(),
2147 true,
2148 changed_columns,
2149 );
2150 Ok(())
2151 }
2152
2153 pub(crate) fn apply_loaded_patch_entity(
2154 &self,
2155 collection: String,
2156 entity: crate::storage::UnifiedEntity,
2157 payload: JsonValue,
2158 operations: Vec<PatchEntityOperation>,
2159 ) -> RedDBResult<CreateEntityOutput> {
2160 let applied =
2161 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2162 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2163 self.flush_applied_entity_mutation(&applied)?;
2164 Ok(CreateEntityOutput {
2165 id: applied.id,
2166 entity: Some(applied.entity),
2167 })
2168 }
2169}
2170
2171fn ensure_non_tree_reserved_metadata_patch_paths(
2172 operations: &[PatchEntityOperation],
2173) -> RedDBResult<()> {
2174 for operation in operations {
2175 let Some(key) = operation.path.first().map(String::as_str) else {
2176 continue;
2177 };
2178 ensure_non_tree_reserved_metadata_key(key)?;
2179 }
2180 Ok(())
2181}
2182
2183fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2184 if key.starts_with(TREE_METADATA_PREFIX) {
2185 return Err(crate::RedDBError::Query(format!(
2186 "metadata key '{}' is reserved for managed trees",
2187 key
2188 )));
2189 }
2190 Ok(())
2191}
2192
2193fn ensure_non_tree_reserved_metadata_entries(
2194 metadata: &[(String, MetadataValue)],
2195) -> RedDBResult<()> {
2196 for (key, _) in metadata {
2197 ensure_non_tree_reserved_metadata_key(key)?;
2198 }
2199 Ok(())
2200}
2201
2202fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2203 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2204 return Err(crate::RedDBError::Query(format!(
2205 "edge label '{}' is reserved for managed trees",
2206 TREE_CHILD_EDGE_LABEL
2207 )));
2208 }
2209 Ok(())
2210}
2211
2212impl RedDBRuntime {
2213 pub(crate) fn create_node_unchecked(
2214 &self,
2215 input: CreateNodeInput,
2216 ) -> RedDBResult<CreateEntityOutput> {
2217 let db = self.db();
2218 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2219 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2220 let mut metadata = input.metadata;
2221 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2222 let mut builder = db.node(&input.collection, &input.label);
2223
2224 if let Some(node_type) = input.node_type {
2225 builder = builder.node_type(node_type);
2226 }
2227
2228 for (key, value) in input.properties {
2229 builder = builder.property(key, value);
2230 }
2231
2232 for (key, value) in metadata {
2233 builder = builder.metadata(key, value);
2234 }
2235
2236 for embedding in input.embeddings {
2237 if let Some(model) = embedding.model {
2238 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2239 } else {
2240 builder = builder.embedding(embedding.name, embedding.vector);
2241 }
2242 }
2243
2244 for link in input.table_links {
2245 builder = builder.link_to_table(link.key, link.table);
2246 }
2247
2248 for link in input.node_links {
2249 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2250 }
2251
2252 let id = builder.save()?;
2253 self.stamp_xmin_if_in_txn(&input.collection, id);
2256 refresh_context_index(&db, &input.collection, id)?;
2257 self.cdc_emit(
2258 crate::replication::cdc::ChangeOperation::Insert,
2259 &input.collection,
2260 id.raw(),
2261 "graph_node",
2262 );
2263 Ok(CreateEntityOutput {
2264 id,
2265 entity: db.store().get(&input.collection, id),
2266 })
2267 }
2268
2269 pub(crate) fn create_edge_unchecked(
2270 &self,
2271 input: CreateEdgeInput,
2272 ) -> RedDBResult<CreateEntityOutput> {
2273 let db = self.db();
2274 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2275 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2276 let mut metadata = input.metadata;
2277 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2278 let mut builder = db
2279 .edge(&input.collection, &input.label)
2280 .from(input.from)
2281 .to(input.to);
2282
2283 if let Some(weight) = input.weight {
2284 builder = builder.weight(weight);
2285 }
2286
2287 for (key, value) in input.properties {
2288 builder = builder.property(key, value);
2289 }
2290
2291 for (key, value) in metadata {
2292 builder = builder.metadata(key, value);
2293 }
2294
2295 let id = builder.save()?;
2296 self.stamp_xmin_if_in_txn(&input.collection, id);
2299 refresh_context_index(&db, &input.collection, id)?;
2300 self.cdc_emit(
2301 crate::replication::cdc::ChangeOperation::Insert,
2302 &input.collection,
2303 id.raw(),
2304 "graph_edge",
2305 );
2306 Ok(CreateEntityOutput {
2307 id,
2308 entity: db.store().get(&input.collection, id),
2309 })
2310 }
2311}
2312
2313fn create_rows_batch_prevalidated_columnar_with_outputs(
2314 runtime: &RedDBRuntime,
2315 collection: String,
2316 column_names: std::sync::Arc<Vec<String>>,
2317 rows: Vec<Vec<crate::storage::schema::Value>>,
2318) -> RedDBResult<Vec<CreateEntityOutput>> {
2319 use crate::storage::{
2320 unified::{EntityData, EntityKind, RowData},
2321 EntityId, UnifiedEntity,
2322 };
2323 use std::sync::Arc;
2324
2325 if rows.is_empty() {
2326 return Ok(Vec::new());
2327 }
2328 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2329 runtime.check_batch_size(rows.len())?;
2330 runtime.check_db_size()?;
2331
2332 let db = runtime.db();
2333 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2334 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2335
2336 let store = db.store();
2337 let table_arc: Arc<str> = Arc::from(collection.as_str());
2338
2339 let indexed_cols = runtime
2340 .index_store_ref()
2341 .indexed_columns_set(collection.as_str());
2342 let has_secondary_indexes = !indexed_cols.is_empty();
2343 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2344 if has_secondary_indexes {
2345 Vec::with_capacity(rows.len())
2346 } else {
2347 Vec::new()
2348 };
2349
2350 let entities: Vec<UnifiedEntity> = rows
2351 .into_iter()
2352 .map(|values| {
2353 if has_secondary_indexes {
2354 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2355 Vec::with_capacity(indexed_cols.len());
2356 for (name, val) in column_names.iter().zip(values.iter()) {
2357 if indexed_cols.contains(name) {
2358 snap.push((name.clone(), val.clone()));
2359 }
2360 }
2361 field_snapshots.push(snap);
2362 }
2363 let mut row = RowData::new(values);
2364 row.schema = Some(Arc::clone(&column_names));
2365 UnifiedEntity::new(
2366 EntityId::new(0),
2367 EntityKind::TableRow {
2368 table: Arc::clone(&table_arc),
2369 row_id: 0,
2370 },
2371 EntityData::Row(row),
2372 )
2373 })
2374 .collect();
2375
2376 let ids = store
2377 .bulk_insert(&collection, entities)
2378 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2379
2380 if has_secondary_indexes {
2381 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2382 .iter()
2383 .zip(field_snapshots)
2384 .map(|(id, fields)| (*id, fields))
2385 .collect();
2386 runtime
2387 .index_store_ref()
2388 .index_entity_insert_batch(&collection, &index_rows)
2389 .map_err(crate::RedDBError::Internal)?;
2390 }
2391
2392 runtime.invalidate_result_cache();
2393 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2394
2395 Ok(ids
2396 .into_iter()
2397 .map(|id| CreateEntityOutput { id, entity: None })
2398 .collect())
2399}
2400
2401impl RuntimeEntityPort for RedDBRuntime {
2402 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2403 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2404 let db = self.db();
2405 let CreateRowInput {
2406 collection,
2407 fields,
2408 metadata: input_metadata,
2409 node_links,
2410 vector_links,
2411 } = input;
2412 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2413 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2414 let mut metadata = input_metadata;
2415 apply_collection_default_ttl(&db, &collection, &mut metadata);
2416 let fields = contract.normalize_insert_fields(fields)?;
2417 contract.enforce_row_uniqueness(&fields, None)?;
2418 let engine = self.mutation_engine();
2420 let result = engine.apply(
2421 collection.clone(),
2422 vec![crate::runtime::mutation::MutationRow {
2423 fields,
2424 metadata,
2425 node_links,
2426 vector_links,
2427 }],
2428 )?;
2429 let id = result.ids[0];
2430 Ok(CreateEntityOutput {
2435 id,
2436 entity: db.store().get(&collection, id),
2437 })
2438 }
2439
2440 fn create_rows_batch(
2441 &self,
2442 input: CreateRowsBatchInput,
2443 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2444 if input.rows.is_empty() {
2445 return Ok(Vec::new());
2446 }
2447 self.check_batch_size(input.rows.len())?;
2448 self.check_db_size()?;
2449
2450 let db = self.db();
2451 let collection = input.collection;
2452 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2453 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2454
2455 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2456 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2457 for row in input.rows {
2458 if row.collection != collection {
2459 return Err(crate::RedDBError::Query(format!(
2460 "batch row collection mismatch: expected '{}', got '{}'",
2461 collection, row.collection
2462 )));
2463 }
2464
2465 let mut metadata = row.metadata;
2466 apply_collection_default_ttl(&db, &collection, &mut metadata);
2467 let fields = contract.normalize_insert_fields(row.fields)?;
2468 contract.enforce_row_uniqueness(&fields, None)?;
2469 uniqueness_rows.push(fields.clone());
2470 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2471 }
2472
2473 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2474
2475 let engine = {
2478 let e = self.mutation_engine();
2479 if input.suppress_events {
2480 e.with_suppress_events()
2481 } else {
2482 e
2483 }
2484 };
2485 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2486 .into_iter()
2487 .map(|(fields, metadata, node_links, vector_links)| {
2488 crate::runtime::mutation::MutationRow {
2489 fields,
2490 metadata,
2491 node_links,
2492 vector_links,
2493 }
2494 })
2495 .collect();
2496
2497 let result = engine
2498 .apply(collection.clone(), mutation_rows)
2499 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2500
2501 let store = db.store();
2502 Ok(result
2503 .ids
2504 .into_iter()
2505 .map(|id| CreateEntityOutput {
2506 id,
2507 entity: store.get(&collection, id),
2508 })
2509 .collect())
2510 }
2511
2512 fn create_rows_batch_prevalidated_columnar(
2513 &self,
2514 collection: String,
2515 column_names: std::sync::Arc<Vec<String>>,
2516 rows: Vec<Vec<crate::storage::schema::Value>>,
2517 ) -> RedDBResult<usize> {
2518 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2519 .map(|outputs| outputs.len())
2520 }
2521
2522 fn create_rows_batch_columnar(
2523 &self,
2524 collection: String,
2525 column_names: std::sync::Arc<Vec<String>>,
2526 rows: Vec<Vec<crate::storage::schema::Value>>,
2527 ) -> RedDBResult<usize> {
2528 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2529 .map(|outputs| outputs.len())
2530 }
2531
2532 fn create_rows_batch_columnar_with_outputs(
2533 &self,
2534 collection: String,
2535 column_names: std::sync::Arc<Vec<String>>,
2536 rows: Vec<Vec<crate::storage::schema::Value>>,
2537 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2538 if rows.is_empty() {
2539 return Ok(Vec::new());
2540 }
2541 self.check_batch_size(rows.len())?;
2542 self.check_db_size()?;
2543
2544 let db = self.db();
2545 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2546 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2547
2548 let needs_normalisation = match db.collection_contract(&collection) {
2558 Some(c) => {
2559 c.declared_model == crate::catalog::CollectionModel::Table
2560 && (!c.declared_columns.is_empty()
2561 || c.table_def
2562 .as_ref()
2563 .map(|t| !t.columns.is_empty())
2564 .unwrap_or(false))
2565 }
2566 None => false,
2567 };
2568 if !needs_normalisation {
2569 return create_rows_batch_prevalidated_columnar_with_outputs(
2570 self,
2571 collection,
2572 column_names,
2573 rows,
2574 );
2575 }
2576
2577 let ncols = column_names.len();
2584 let tuple_rows: Vec<CreateRowInput> = rows
2585 .into_iter()
2586 .map(|values| {
2587 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2588 Vec::with_capacity(ncols);
2589 for (name, value) in column_names.iter().zip(values) {
2590 fields.push((name.clone(), value));
2591 }
2592 CreateRowInput {
2593 collection: collection.clone(),
2594 fields,
2595 metadata: Vec::new(),
2596 node_links: Vec::new(),
2597 vector_links: Vec::new(),
2598 }
2599 })
2600 .collect();
2601 self.create_rows_batch(CreateRowsBatchInput {
2602 collection,
2603 rows: tuple_rows,
2604 suppress_events: false,
2605 })
2606 }
2607
2608 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2609 if input.rows.is_empty() {
2610 return Ok(0);
2611 }
2612 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2613 self.check_batch_size(input.rows.len())?;
2614 self.check_db_size()?;
2615
2616 let db = self.db();
2617 let collection = input.collection;
2618 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2623 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2624
2625 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2631
2632 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2633 Vec::with_capacity(input.rows.len());
2634 let mut mutation_rows = mutation_rows;
2635 for row in input.rows {
2636 if row.collection != collection {
2637 return Err(crate::RedDBError::Query(format!(
2638 "batch row collection mismatch: expected '{}', got '{}'",
2639 collection, row.collection
2640 )));
2641 }
2642 let mut metadata = row.metadata;
2643 if let Some(ttl) = default_ttl_ms {
2644 if !has_internal_ttl_metadata(&metadata) {
2645 metadata.push((
2646 "_ttl_ms".to_string(),
2647 if ttl <= i64::MAX as u64 {
2648 MetadataValue::Int(ttl as i64)
2649 } else {
2650 MetadataValue::Timestamp(ttl)
2651 },
2652 ));
2653 }
2654 }
2655 mutation_rows.push(crate::runtime::mutation::MutationRow {
2656 fields: row.fields,
2657 metadata,
2658 node_links: row.node_links,
2659 vector_links: row.vector_links,
2660 });
2661 }
2662
2663 let engine = self.mutation_engine();
2664 let result = engine
2665 .apply(collection, mutation_rows)
2666 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2667 Ok(result.ids.len())
2668 }
2669
2670 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2671 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2672 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2673 input.properties.iter().map(|(key, _)| key.as_str()),
2674 &format!("node '{}'", input.collection),
2675 )?;
2676 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2677 self.create_node_unchecked(input)
2678 }
2679
2680 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2681 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2682 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2683 input.properties.iter().map(|(key, _)| key.as_str()),
2684 &format!("edge '{}'", input.collection),
2685 )?;
2686 ensure_non_tree_structural_edge_label(&input.label)?;
2687 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2688 self.create_edge_unchecked(input)
2689 }
2690
2691 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2692 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2693 let db = self.db();
2694 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2695 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2696 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2697 let mut metadata = input.metadata;
2698 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2699 let mut builder = db.vector(&input.collection).dense(input.dense);
2700
2701 if let Some(content) = input.content {
2702 builder = builder.content(content);
2703 }
2704
2705 for (key, value) in metadata {
2706 builder = builder.metadata(key, value);
2707 }
2708
2709 if let Some(link_row) = input.link_row {
2710 builder = builder.link_to_table(link_row);
2711 }
2712
2713 if let Some(link_node) = input.link_node {
2714 builder = builder.link_to_node(link_node);
2715 }
2716
2717 let id = builder.save()?;
2718 self.stamp_xmin_if_in_txn(&input.collection, id);
2721 refresh_context_index(&db, &input.collection, id)?;
2722 self.cdc_emit(
2723 crate::replication::cdc::ChangeOperation::Insert,
2724 &input.collection,
2725 id.raw(),
2726 "vector",
2727 );
2728 Ok(CreateEntityOutput {
2729 id,
2730 entity: db.store().get(&input.collection, id),
2731 })
2732 }
2733
2734 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2735 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2736 let db = self.db();
2737 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2738 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2739
2740 if let JsonValue::Object(ref map) = input.body {
2741 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2742 map.keys().map(String::as_str),
2743 &format!("document '{}'", input.collection),
2744 )?;
2745 }
2746
2747 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2749 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2750 })?;
2751 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2752 "body".to_string(),
2753 crate::storage::schema::Value::Json(body_bytes),
2754 )];
2755
2756 if let JsonValue::Object(ref map) = input.body {
2758 for (key, value) in map {
2759 let storage_value = json_to_storage_value(value)?;
2760 fields.push((key.clone(), storage_value));
2761 }
2762 }
2763
2764 let mut metadata = input.metadata;
2765 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2766 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2767 .iter()
2768 .map(|(key, value)| (key.as_str(), value.clone()))
2769 .collect();
2770 let mut builder = db.row(&input.collection, columns);
2771
2772 for (key, value) in metadata {
2773 builder = builder.metadata(key, value);
2774 }
2775
2776 for node in input.node_links {
2777 builder = builder.link_to_node(node);
2778 }
2779
2780 for vector in input.vector_links {
2781 builder = builder.link_to_vector(vector);
2782 }
2783
2784 let id = builder.save()?;
2785 self.stamp_xmin_if_in_txn(&input.collection, id);
2787 refresh_context_index(&db, &input.collection, id)?;
2788 self.cdc_emit(
2789 crate::replication::cdc::ChangeOperation::Insert,
2790 &input.collection,
2791 id.raw(),
2792 "document",
2793 );
2794 Ok(CreateEntityOutput {
2795 id,
2796 entity: db.store().get(&input.collection, id),
2797 })
2798 }
2799
2800 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2801 let db = self.db();
2802 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2803 let declared_model = db
2804 .collection_contract(&input.collection)
2805 .map(|contract| contract.declared_model);
2806 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2807 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2808 self.seal_vault_value(&input.collection, input.value)?
2809 } else {
2810 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2811 input.value
2812 };
2813 let fields = vec![
2814 (
2815 "key".to_string(),
2816 crate::storage::schema::Value::text(input.key),
2817 ),
2818 ("value".to_string(), value),
2819 ];
2820 let collection = input.collection;
2821 let result = self.mutation_engine().apply(
2822 collection.clone(),
2823 vec![crate::runtime::mutation::MutationRow {
2824 fields,
2825 metadata: input.metadata,
2826 node_links: Vec::new(),
2827 vector_links: Vec::new(),
2828 }],
2829 )?;
2830 let id = result.ids[0];
2831 Ok(CreateEntityOutput {
2832 id,
2833 entity: db.store().get(&collection, id),
2834 })
2835 }
2836
2837 fn create_timeseries_point(
2838 &self,
2839 input: CreateTimeSeriesPointInput,
2840 ) -> RedDBResult<CreateEntityOutput> {
2841 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2842 let db = self.db();
2843 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2844 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2845
2846 let mut fields = vec![
2847 (
2848 "metric".to_string(),
2849 crate::storage::schema::Value::text(input.metric),
2850 ),
2851 (
2852 "value".to_string(),
2853 crate::storage::schema::Value::Float(input.value),
2854 ),
2855 ];
2856
2857 if let Some(timestamp_ns) = input.timestamp_ns {
2858 fields.push((
2859 "timestamp".to_string(),
2860 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2861 ));
2862 }
2863
2864 if !input.tags.is_empty() {
2865 let tags_json = JsonValue::Object(
2866 input
2867 .tags
2868 .into_iter()
2869 .map(|(key, value)| (key, JsonValue::String(value)))
2870 .collect(),
2871 );
2872 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2873 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2874 })?;
2875 fields.push((
2876 "tags".to_string(),
2877 crate::storage::schema::Value::Json(tags_bytes),
2878 ));
2879 }
2880
2881 let collection = input.collection;
2882 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2883 self.stamp_xmin_if_in_txn(&collection, id);
2886 refresh_context_index(&db, &collection, id)?;
2887
2888 Ok(CreateEntityOutput {
2889 id,
2890 entity: db.store().get(&collection, id),
2891 })
2892 }
2893
2894 fn get_kv(
2895 &self,
2896 collection: &str,
2897 key: &str,
2898 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2899 let db = self.db();
2900 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2901 let store = db.store();
2902 let Some(manager) = store.get_collection(collection) else {
2903 return Ok(None);
2904 };
2905 let entities = manager.query_all(|_| true);
2906 for entity in entities {
2907 if let crate::storage::EntityData::Row(ref row) = entity.data {
2908 if let Some(ref named) = row.named {
2909 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2910 if &**k == key {
2911 let value = named
2912 .get("value")
2913 .cloned()
2914 .unwrap_or(crate::storage::schema::Value::Null);
2915 return Ok(Some((value, entity.id)));
2916 }
2917 }
2918 }
2919 }
2920 }
2921 Ok(None)
2922 }
2923
2924 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2925 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2926 let found = self.get_kv(collection, key)?;
2927 if let Some((_, id)) = found {
2928 let db = self.db();
2929 let store = db.store();
2930 let deleted = store
2931 .delete(collection, id)
2932 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2933 if deleted {
2934 store.context_index().remove_entity(id);
2935 }
2936 Ok(deleted)
2937 } else {
2938 Ok(false)
2939 }
2940 }
2941
2942 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2943 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2944 let PatchEntityInput {
2945 collection,
2946 id,
2947 payload,
2948 operations,
2949 } = input;
2950 let db = self.db();
2951 let store = db.store();
2952 let Some(manager) = store.get_collection(&collection) else {
2953 return Err(crate::RedDBError::NotFound(format!(
2954 "collection not found: {collection}"
2955 )));
2956 };
2957 let Some(entity) = manager.get(id) else {
2958 return Err(crate::RedDBError::NotFound(format!(
2959 "entity not found: {}",
2960 id.raw()
2961 )));
2962 };
2963 self.apply_loaded_patch_entity(collection, entity, payload, operations)
2964 }
2965
2966 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2967 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2968 let store = self.db().store();
2969 let pre_delete_fields = store
2973 .get(&input.collection, input.id)
2974 .as_ref()
2975 .map(entity_row_fields_snapshot)
2976 .unwrap_or_default();
2977 let deleted = store
2981 .delete(&input.collection, input.id)
2982 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2983 if deleted {
2984 store.context_index().remove_entity(input.id);
2985 if !pre_delete_fields.is_empty() {
2988 self.index_store_ref()
2989 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2990 .map_err(crate::RedDBError::Internal)?;
2991 }
2992 self.cdc_emit(
2993 crate::replication::cdc::ChangeOperation::Delete,
2994 &input.collection,
2995 input.id.raw(),
2996 "entity",
2997 );
2998 }
2999 Ok(DeleteEntityOutput {
3000 deleted,
3001 id: input.id,
3002 })
3003 }
3004}