1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 vector_dimension: None,
112 vector_metric: None,
113 context_index_fields: Vec::new(),
114 declared_columns: Vec::new(),
115 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117 timestamps_enabled: false,
118 context_index_enabled: false,
119 metrics_raw_retention_ms: None,
120 metrics_rollup_policies: Vec::new(),
121 metrics_tenant_identity: None,
122 metrics_namespace: None,
123 append_only: false,
126 subscriptions: Vec::new(),
127 session_key: None,
128 session_gap_ms: None,
129 retention_duration_ms: None,
130 })
131 .map(|_| ())
132 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
133}
134
135fn implicit_contract_unix_ms() -> u128 {
136 std::time::SystemTime::now()
137 .duration_since(std::time::UNIX_EPOCH)
138 .unwrap_or_default()
139 .as_millis()
140}
141
142fn collection_model_allows(
143 declared_model: crate::catalog::CollectionModel,
144 requested_model: crate::catalog::CollectionModel,
145) -> bool {
146 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
147}
148
149fn ensure_vector_dimension_contract(
150 db: &crate::storage::unified::devx::RedDB,
151 collection: &str,
152 actual_dimension: usize,
153) -> RedDBResult<()> {
154 let Some(expected_dimension) = db
155 .collection_contract(collection)
156 .and_then(|contract| contract.vector_dimension)
157 else {
158 return Ok(());
159 };
160 if expected_dimension == actual_dimension {
161 return Ok(());
162 }
163 Err(crate::RedDBError::Query(format!(
164 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
165 )))
166}
167
168fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
169 match model {
170 crate::catalog::CollectionModel::Table => "table",
171 crate::catalog::CollectionModel::Document => "document",
172 crate::catalog::CollectionModel::Graph => "graph",
173 crate::catalog::CollectionModel::Vector => "vector",
174 crate::catalog::CollectionModel::Hll => "hll",
175 crate::catalog::CollectionModel::Sketch => "sketch",
176 crate::catalog::CollectionModel::Filter => "filter",
177 crate::catalog::CollectionModel::Kv => "kv",
178 crate::catalog::CollectionModel::Config => "config",
179 crate::catalog::CollectionModel::Vault => "vault",
180 crate::catalog::CollectionModel::Mixed => "mixed",
181 crate::catalog::CollectionModel::TimeSeries => "timeseries",
182 crate::catalog::CollectionModel::Queue => "queue",
183 crate::catalog::CollectionModel::Metrics => "metrics",
184 }
185}
186
187#[derive(Clone)]
188struct UniquenessRule {
189 name: String,
190 columns: Vec<String>,
191 primary_key: bool,
192}
193
194#[derive(Copy, Clone, PartialEq, Eq)]
195enum NormalizeMode {
196 Insert,
200 Update,
204}
205
206mod collection_contract_enforcement {
207 use super::*;
208
209 pub(super) struct CollectionContractWriteEnforcer<'a> {
210 db: &'a crate::storage::unified::devx::RedDB,
211 collection: &'a str,
212 }
213
214 impl<'a> CollectionContractWriteEnforcer<'a> {
215 pub(super) fn new(
216 db: &'a crate::storage::unified::devx::RedDB,
217 collection: &'a str,
218 ) -> Self {
219 Self { db, collection }
220 }
221
222 pub(super) fn ensure_model(
223 &self,
224 requested_model: crate::catalog::CollectionModel,
225 ) -> RedDBResult<()> {
226 ensure_collection_model_contract(self.db, self.collection, requested_model)
227 }
228
229 pub(super) fn normalize_insert_fields(
230 &self,
231 fields: Vec<(String, Value)>,
232 ) -> RedDBResult<Vec<(String, Value)>> {
233 normalize_row_fields_for_contract_with_mode(
234 self.db,
235 self.collection,
236 fields,
237 NormalizeMode::Insert,
238 )
239 }
240
241 pub(super) fn normalize_update_fields(
242 &self,
243 fields: Vec<(String, Value)>,
244 ) -> RedDBResult<Vec<(String, Value)>> {
245 normalize_row_fields_for_contract_with_mode(
246 self.db,
247 self.collection,
248 fields,
249 NormalizeMode::Update,
250 )
251 }
252
253 pub(super) fn enforce_row_uniqueness(
254 &self,
255 fields: &[(String, Value)],
256 exclude_id: Option<crate::storage::EntityId>,
257 ) -> RedDBResult<()> {
258 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
259 }
260
261 pub(super) fn enforce_batch_uniqueness(
262 &self,
263 rows: &[Vec<(String, Value)>],
264 ) -> RedDBResult<()> {
265 enforce_row_batch_uniqueness(self.db, self.collection, rows)
266 }
267
268 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
269 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
270 }
271 }
272}
273
274use collection_contract_enforcement::CollectionContractWriteEnforcer;
275
276fn normalize_row_fields_for_contract_with_mode(
277 db: &crate::storage::unified::devx::RedDB,
278 collection: &str,
279 fields: Vec<(String, Value)>,
280 mode: NormalizeMode,
281) -> RedDBResult<Vec<(String, Value)>> {
282 let Some(contract) = db.collection_contract(collection) else {
283 return Ok(fields);
284 };
285
286 if contract.declared_model != crate::catalog::CollectionModel::Table
287 || (contract.declared_columns.is_empty()
288 && contract
289 .table_def
290 .as_ref()
291 .map(|table| table.columns.is_empty())
292 .unwrap_or(true))
293 {
294 return Ok(fields);
295 }
296
297 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
307 fields
308 .iter()
309 .find(|(n, _)| n == "created_at")
310 .map(|(_, v)| v.clone())
311 } else {
312 None
313 };
314
315 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
320 for (name, _) in &fields {
321 if name == "created_at" || name == "updated_at" {
322 return Err(crate::RedDBError::Query(format!(
323 "collection '{}' manages '{}' automatically — do not set it in INSERT",
324 collection, name
325 )));
326 }
327 }
328 }
329
330 let mut provided = std::collections::BTreeMap::new();
331 for (name, value) in &fields {
332 provided.insert(name.clone(), value.clone());
333 }
334
335 let resolved_columns = resolved_contract_columns(&contract)?;
336 let declared_names: std::collections::BTreeSet<String> = resolved_columns
337 .iter()
338 .map(|column| column.name.clone())
339 .collect();
340 let unknown_fields: Vec<String> = fields
341 .iter()
342 .filter(|(name, _)| !declared_names.contains(name))
343 .map(|(name, _)| name.clone())
344 .collect();
345 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
346 && !unknown_fields.is_empty()
347 {
348 return Err(crate::RedDBError::Query(format!(
349 "collection '{}' is strict and does not allow undeclared fields: {}",
350 collection,
351 unknown_fields.join(", ")
352 )));
353 }
354 let mut normalized = Vec::new();
355 let now_ms = current_unix_ms_u64();
356
357 for column in &resolved_columns {
358 match provided.remove(&column.name) {
359 Some(value) => {
360 if contract.timestamps_enabled && mode == NormalizeMode::Update {
365 match column.name.as_str() {
366 "created_at" => {
367 normalized.push((
368 column.name.clone(),
369 existing_created_at
370 .clone()
371 .unwrap_or(Value::UnsignedInteger(now_ms)),
372 ));
373 continue;
374 }
375 "updated_at" => {
376 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
377 continue;
378 }
379 _ => {}
380 }
381 }
382 normalized.push((
383 column.name.clone(),
384 normalize_contract_value(collection, column, value)?,
385 ));
386 }
387 None => {
388 if contract.timestamps_enabled
392 && (column.name == "created_at" || column.name == "updated_at")
393 {
394 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
395 continue;
396 }
397 if let Some(default) = &column.default {
398 normalized.push((
399 column.name.clone(),
400 coerce_contract_literal(collection, &column.name, column, default)?,
401 ));
402 } else if column.not_null {
403 return Err(crate::RedDBError::Query(format!(
404 "missing required column '{}' for collection '{}'",
405 column.name, collection
406 )));
407 }
408 }
409 }
410 }
411
412 for (name, value) in fields {
413 if !declared_names.contains(&name) {
414 normalized.push((name, value));
415 }
416 }
417
418 Ok(normalized)
419}
420
421fn current_unix_ms_u64() -> u64 {
422 std::time::SystemTime::now()
423 .duration_since(std::time::UNIX_EPOCH)
424 .map(|d| d.as_millis() as u64)
425 .unwrap_or(0)
426}
427
428fn enforce_row_uniqueness(
429 db: &crate::storage::unified::devx::RedDB,
430 collection: &str,
431 fields: &[(String, Value)],
432 exclude_id: Option<crate::storage::EntityId>,
433) -> RedDBResult<()> {
434 let Some(contract) = db.collection_contract(collection) else {
435 return Ok(());
436 };
437 if contract.declared_model != crate::catalog::CollectionModel::Table
438 && contract.declared_model != crate::catalog::CollectionModel::Mixed
439 {
440 return Ok(());
441 }
442
443 let rules = resolved_uniqueness_rules(&contract);
444 if rules.is_empty() {
445 return Ok(());
446 }
447
448 let store = db.store();
449 let Some(manager) = store.get_collection(collection) else {
450 return Ok(());
451 };
452
453 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
454
455 for rule in &rules {
456 let mut expected_signatures = Vec::new();
457 let mut skip_rule = false;
458
459 for column in &rule.columns {
460 match input_fields.get(column) {
461 Some(Value::Null) | None if rule.primary_key => {
462 return Err(crate::RedDBError::Query(format!(
463 "primary key '{}' in collection '{}' requires non-null column '{}'",
464 rule.name, collection, column
465 )))
466 }
467 Some(Value::Null) | None => {
468 skip_rule = true;
469 break;
470 }
471 Some(value) => {
472 expected_signatures.push((column.clone(), value_signature(value)));
473 }
474 }
475 }
476
477 if skip_rule {
478 continue;
479 }
480
481 for entity in manager.query_all(|_| true) {
482 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
483 continue;
484 }
485 let Some(existing_fields) = row_fields_from_entity(&entity) else {
486 continue;
487 };
488
489 let duplicate = expected_signatures.iter().all(|(column, expected)| {
490 existing_fields
491 .get(column)
492 .map(|value| value_signature(value) == *expected)
493 .unwrap_or(false)
494 });
495
496 if duplicate {
497 let qualifier = if rule.primary_key {
498 "primary key"
499 } else {
500 "unique constraint"
501 };
502 return Err(crate::RedDBError::Query(format!(
503 "{} '{}' violated on collection '{}' for columns [{}]",
504 qualifier,
505 rule.name,
506 collection,
507 rule.columns.join(", ")
508 )));
509 }
510 }
511 }
512
513 Ok(())
514}
515
516fn enforce_row_batch_uniqueness(
517 db: &crate::storage::unified::devx::RedDB,
518 collection: &str,
519 rows: &[Vec<(String, Value)>],
520) -> RedDBResult<()> {
521 let Some(contract) = db.collection_contract(collection) else {
522 return Ok(());
523 };
524 if contract.declared_model != crate::catalog::CollectionModel::Table
525 && contract.declared_model != crate::catalog::CollectionModel::Mixed
526 {
527 return Ok(());
528 }
529
530 let rules = resolved_uniqueness_rules(&contract);
531 if rules.is_empty() {
532 return Ok(());
533 }
534
535 for rule in &rules {
536 let mut seen = std::collections::HashMap::<String, usize>::new();
537 for (row_index, fields) in rows.iter().enumerate() {
538 let input_fields: std::collections::BTreeMap<String, Value> =
539 fields.iter().cloned().collect();
540 let mut signatures = Vec::new();
541 let mut skip_rule = false;
542
543 for column in &rule.columns {
544 match input_fields.get(column) {
545 Some(Value::Null) | None if rule.primary_key => {
546 return Err(crate::RedDBError::Query(format!(
547 "primary key '{}' in collection '{}' requires non-null column '{}'",
548 rule.name, collection, column
549 )))
550 }
551 Some(Value::Null) | None => {
552 skip_rule = true;
553 break;
554 }
555 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
556 }
557 }
558
559 if skip_rule {
560 continue;
561 }
562
563 let signature = signatures.join("|");
564 if let Some(previous_index) = seen.insert(signature, row_index) {
565 return Err(crate::RedDBError::Query(format!(
566 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
567 rule.name,
568 collection,
569 previous_index + 1,
570 row_index + 1
571 )));
572 }
573 }
574 }
575
576 Ok(())
577}
578
579fn row_update_requires_uniqueness_check(
580 db: &crate::storage::unified::devx::RedDB,
581 collection: &str,
582 modified_columns: &[String],
583) -> bool {
584 if modified_columns.is_empty() {
585 return false;
586 }
587
588 let Some(contract) = db.collection_contract(collection) else {
589 return false;
590 };
591 if contract.declared_model != crate::catalog::CollectionModel::Table
592 && contract.declared_model != crate::catalog::CollectionModel::Mixed
593 {
594 return false;
595 }
596
597 let rules = resolved_uniqueness_rules(&contract);
598 if rules.is_empty() {
599 return false;
600 }
601
602 rules.iter().any(|rule| {
603 rule.columns.iter().any(|column| {
604 modified_columns
605 .iter()
606 .any(|modified| modified.eq_ignore_ascii_case(column))
607 })
608 })
609}
610
611pub(crate) fn build_row_update_contract_plan(
612 db: &crate::storage::unified::devx::RedDB,
613 collection: &str,
614) -> RedDBResult<Option<RowUpdateContractPlan>> {
615 let Some(contract) = db.collection_contract(collection) else {
616 return Ok(None);
617 };
618
619 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
620 && !(contract.declared_columns.is_empty()
621 && contract
622 .table_def
623 .as_ref()
624 .map(|table| table.columns.is_empty())
625 .unwrap_or(true))
626 {
627 resolved_contract_columns(&contract)?
628 .into_iter()
629 .map(|rule| {
630 (
631 rule.name.clone(),
632 RowUpdateColumnRule {
633 name: rule.name,
634 data_type: rule.data_type,
635 data_type_name: rule.data_type_name,
636 not_null: rule.not_null,
637 enum_variants: rule.enum_variants,
638 },
639 )
640 })
641 .collect()
642 } else {
643 HashMap::new()
644 };
645
646 let unique_columns = if matches!(
647 contract.declared_model,
648 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
649 ) {
650 resolved_uniqueness_rules(&contract)
651 .into_iter()
652 .flat_map(|rule| rule.columns.into_iter())
653 .map(|column| (column, ()))
654 .collect()
655 } else {
656 HashMap::new()
657 };
658
659 Ok(Some(RowUpdateContractPlan {
660 timestamps_enabled: contract.timestamps_enabled,
661 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
662 declared_rules,
663 unique_columns,
664 }))
665}
666
667pub(crate) fn normalize_row_update_assignment_with_plan(
668 collection: &str,
669 column: &str,
670 value: Value,
671 row_contract_plan: Option<&RowUpdateContractPlan>,
672) -> RedDBResult<Value> {
673 let Some(plan) = row_contract_plan else {
674 return Ok(value);
675 };
676
677 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
678 return Err(crate::RedDBError::Query(format!(
679 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
680 collection, column
681 )));
682 }
683
684 if let Some(rule) = plan.declared_rules.get(column) {
685 let rule = ResolvedColumnRule {
686 name: rule.name.clone(),
687 data_type: rule.data_type,
688 data_type_name: rule.data_type_name.clone(),
689 not_null: rule.not_null,
690 default: None,
691 enum_variants: rule.enum_variants.clone(),
692 };
693 normalize_contract_value(collection, &rule, value)
694 } else if plan.strict_schema {
695 Err(crate::RedDBError::Query(format!(
696 "collection '{}' is strict and does not allow undeclared fields: {}",
697 collection, column
698 )))
699 } else {
700 Ok(value)
701 }
702}
703
704pub(crate) fn normalize_row_update_value_for_rule(
705 collection: &str,
706 value: Value,
707 row_rule: Option<&RowUpdateColumnRule>,
708) -> RedDBResult<Value> {
709 let Some(rule) = row_rule else {
710 return Ok(value);
711 };
712
713 let rule = ResolvedColumnRule {
714 name: rule.name.clone(),
715 data_type: rule.data_type,
716 data_type_name: rule.data_type_name.clone(),
717 not_null: rule.not_null,
718 default: None,
719 enum_variants: rule.enum_variants.clone(),
720 };
721 normalize_contract_value(collection, &rule, value)
722}
723
724fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
725 if let Some(named) = row.named.as_mut() {
726 named.insert(name.to_string(), value);
727 return;
728 }
729
730 if let Some(schema) = row.schema.as_ref() {
731 if let Some(index) = schema.iter().position(|column| column == name) {
732 if let Some(slot) = row.columns.get_mut(index) {
733 *slot = value;
734 return;
735 }
736 }
737
738 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
739 for (column, current) in schema.iter().zip(row.columns.iter()) {
740 named.insert(column.clone(), current.clone());
741 }
742 named.insert(name.to_string(), value);
743 row.named = Some(named);
744 return;
745 }
746
747 let mut named = HashMap::with_capacity(1);
748 named.insert(name.to_string(), value);
749 row.named = Some(named);
750}
751
752fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
753 if let Some(named) = row.named.as_ref() {
754 named
755 .iter()
756 .map(|(key, value)| (key.clone(), value.clone()))
757 .collect()
758 } else if let Some(schema) = row.schema.as_ref() {
759 schema
760 .iter()
761 .cloned()
762 .zip(row.columns.iter().cloned())
763 .collect()
764 } else {
765 Vec::new()
766 }
767}
768
769fn apply_row_field_assignments_raw<I>(
770 row: &mut crate::storage::unified::entity::RowData,
771 field_assignments: I,
772) where
773 I: IntoIterator<Item = (String, Value)>,
774{
775 for (column, value) in field_assignments {
776 set_row_field(row, &column, value);
777 }
778}
779
780fn apply_row_field_assignments_incremental<I>(
781 collection: &str,
782 row: &mut crate::storage::unified::entity::RowData,
783 field_assignments: I,
784 row_contract_plan: Option<&RowUpdateContractPlan>,
785) -> RedDBResult<()>
786where
787 I: IntoIterator<Item = (String, Value)>,
788{
789 for (column, value) in field_assignments {
790 let value = normalize_row_update_assignment_with_plan(
791 collection,
792 &column,
793 value,
794 row_contract_plan,
795 )?;
796
797 set_row_field(row, &column, value);
798 }
799
800 Ok(())
801}
802
803fn resolved_uniqueness_rules(
804 contract: &crate::physical::CollectionContract,
805) -> Vec<UniquenessRule> {
806 let mut rules = Vec::new();
807
808 if let Some(table_def) = &contract.table_def {
809 if !table_def.primary_key.is_empty() {
810 rules.push(UniquenessRule {
811 name: "primary_key".to_string(),
812 columns: table_def.primary_key.clone(),
813 primary_key: true,
814 });
815 }
816
817 for constraint in &table_def.constraints {
818 if matches!(
819 constraint.constraint_type,
820 crate::storage::schema::ConstraintType::PrimaryKey
821 ) && !constraint.columns.is_empty()
822 {
823 rules.push(UniquenessRule {
824 name: constraint.name.clone(),
825 columns: constraint.columns.clone(),
826 primary_key: true,
827 });
828 } else if matches!(
829 constraint.constraint_type,
830 crate::storage::schema::ConstraintType::Unique
831 ) && !constraint.columns.is_empty()
832 {
833 rules.push(UniquenessRule {
834 name: constraint.name.clone(),
835 columns: constraint.columns.clone(),
836 primary_key: false,
837 });
838 }
839 }
840 } else {
841 for column in &contract.declared_columns {
842 if column.primary_key {
843 rules.push(UniquenessRule {
844 name: format!("pk_{}", column.name),
845 columns: vec![column.name.clone()],
846 primary_key: true,
847 });
848 } else if column.unique {
849 rules.push(UniquenessRule {
850 name: format!("uniq_{}", column.name),
851 columns: vec![column.name.clone()],
852 primary_key: false,
853 });
854 }
855 }
856 }
857
858 let mut dedup = std::collections::BTreeSet::new();
859 rules
860 .into_iter()
861 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
862 .collect()
863}
864
865fn row_fields_from_entity(
866 entity: &crate::storage::UnifiedEntity,
867) -> Option<std::collections::BTreeMap<String, Value>> {
868 match &entity.data {
869 crate::storage::EntityData::Row(row) => {
870 if let Some(named) = &row.named {
871 Some(
872 named
873 .iter()
874 .map(|(key, value)| (key.clone(), value.clone()))
875 .collect(),
876 )
877 } else {
878 row.schema.as_ref().map(|schema| {
879 schema
880 .iter()
881 .cloned()
882 .zip(row.columns.iter().cloned())
883 .collect()
884 })
885 }
886 }
887 _ => None,
888 }
889}
890
891fn value_signature(value: &Value) -> String {
892 format!("{value:?}")
893}
894
895fn normalize_contract_value(
896 collection: &str,
897 column: &ResolvedColumnRule,
898 value: Value,
899) -> RedDBResult<Value> {
900 if matches!(value, Value::Null) {
901 if column.not_null {
902 return Err(crate::RedDBError::Query(format!(
903 "column '{}' in collection '{}' cannot be null",
904 column.name, collection
905 )));
906 }
907 return Ok(Value::Null);
908 }
909
910 let target = column.data_type;
911 if value_matches_declared_type(&value, target) {
912 return Ok(value);
913 }
914
915 let Some(raw) = value_to_coercion_input(&value) else {
916 return Err(crate::RedDBError::Query(format!(
917 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
918 column.name, collection, column.data_type_name
919 )));
920 };
921
922 coerce_contract_literal(collection, &column.name, column, &raw)
923}
924
925fn coerce_contract_literal(
926 collection: &str,
927 column_name: &str,
928 column: &ResolvedColumnRule,
929 raw: &str,
930) -> RedDBResult<Value> {
931 let target = column.data_type;
932 match target {
933 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
934 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
935 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
936 crate::RedDBError::Query(format!(
937 "failed to coerce column '{}' in collection '{}' to '{}': {}",
938 column_name, collection, column.data_type_name, err
939 ))
940 }),
941 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
942 crate::RedDBError::Query(format!(
943 "failed to coerce column '{}' in collection '{}' to '{}': {}",
944 column_name, collection, column.data_type_name, err
945 ))
946 }),
947 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
948 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
949 column_name, collection, column.data_type_name
950 ))),
951 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
952 |err| {
953 crate::RedDBError::Query(format!(
954 "failed to coerce column '{}' in collection '{}' to '{}': {}",
955 column_name, collection, column.data_type_name, err
956 ))
957 },
958 ),
959 }
960}
961
962struct ResolvedColumnRule {
963 name: String,
964 data_type: DataType,
965 data_type_name: String,
966 not_null: bool,
967 default: Option<String>,
968 enum_variants: Vec<String>,
969}
970
971fn resolved_contract_columns(
972 contract: &crate::physical::CollectionContract,
973) -> RedDBResult<Vec<ResolvedColumnRule>> {
974 if let Some(table_def) = &contract.table_def {
975 return Ok(table_def
976 .columns
977 .iter()
978 .map(|column| ResolvedColumnRule {
979 name: column.name.clone(),
980 data_type: column.data_type,
981 data_type_name: data_type_name(column.data_type).to_string(),
982 not_null: !column.nullable,
983 default: column
984 .default
985 .as_ref()
986 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
987 enum_variants: column.enum_variants.clone(),
988 })
989 .collect());
990 }
991
992 contract
993 .declared_columns
994 .iter()
995 .map(|column| {
996 let data_type = column
997 .sql_type
998 .as_ref()
999 .map(crate::storage::query::resolve_sql_type_name)
1000 .transpose()
1001 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1002 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1003 Ok(ResolvedColumnRule {
1004 name: column.name.clone(),
1005 data_type,
1006 data_type_name: column.data_type.clone(),
1007 not_null: column.not_null,
1008 default: column.default.clone(),
1009 enum_variants: column.enum_variants.clone(),
1010 })
1011 })
1012 .collect()
1013}
1014
1015fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1016 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1017}
1018
1019fn data_type_name(data_type: DataType) -> &'static str {
1020 match data_type {
1021 DataType::Integer => "integer",
1022 DataType::UnsignedInteger => "unsigned_integer",
1023 DataType::Float => "float",
1024 DataType::Text => "text",
1025 DataType::Blob => "blob",
1026 DataType::Boolean => "boolean",
1027 DataType::Timestamp => "timestamp",
1028 DataType::Duration => "duration",
1029 DataType::IpAddr => "ipaddr",
1030 DataType::MacAddr => "macaddr",
1031 DataType::Vector => "vector",
1032 DataType::Nullable => "nullable",
1033 DataType::Unknown => "unknown",
1034 DataType::Json => "json",
1035 DataType::Uuid => "uuid",
1036 DataType::NodeRef => "noderef",
1037 DataType::EdgeRef => "edgeref",
1038 DataType::VectorRef => "vectorref",
1039 DataType::RowRef => "rowref",
1040 DataType::Color => "color",
1041 DataType::Email => "email",
1042 DataType::Url => "url",
1043 DataType::Phone => "phone",
1044 DataType::Semver => "semver",
1045 DataType::Cidr => "cidr",
1046 DataType::Date => "date",
1047 DataType::Time => "time",
1048 DataType::Decimal => "decimal",
1049 DataType::Enum => "enum",
1050 DataType::Array => "array",
1051 DataType::TimestampMs => "timestamp_ms",
1052 DataType::Ipv4 => "ipv4",
1053 DataType::Ipv6 => "ipv6",
1054 DataType::Subnet => "subnet",
1055 DataType::Port => "port",
1056 DataType::Latitude => "latitude",
1057 DataType::Longitude => "longitude",
1058 DataType::GeoPoint => "geopoint",
1059 DataType::Country2 => "country2",
1060 DataType::Country3 => "country3",
1061 DataType::Lang2 => "lang2",
1062 DataType::Lang5 => "lang5",
1063 DataType::Currency => "currency",
1064 DataType::AssetCode => "asset_code",
1065 DataType::Money => "money",
1066 DataType::ColorAlpha => "color_alpha",
1067 DataType::BigInt => "bigint",
1068 DataType::KeyRef => "keyref",
1069 DataType::DocRef => "docref",
1070 DataType::TableRef => "tableref",
1071 DataType::PageRef => "pageref",
1072 DataType::Secret => "secret",
1073 DataType::Password => "password",
1074 DataType::TextZstd => "text",
1075 DataType::BlobZstd => "blob",
1076 }
1077}
1078
1079fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1080 matches!(
1081 (value, target),
1082 (Value::Null, _)
1083 | (Value::Integer(_), DataType::Integer)
1084 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1085 | (Value::Float(_), DataType::Float)
1086 | (Value::Text(_), DataType::Text)
1087 | (Value::Blob(_), DataType::Blob)
1088 | (Value::Boolean(_), DataType::Boolean)
1089 | (Value::Timestamp(_), DataType::Timestamp)
1090 | (Value::Duration(_), DataType::Duration)
1091 | (Value::IpAddr(_), DataType::IpAddr)
1092 | (Value::MacAddr(_), DataType::MacAddr)
1093 | (Value::Vector(_), DataType::Vector)
1094 | (Value::Json(_), DataType::Json)
1095 | (Value::Uuid(_), DataType::Uuid)
1096 | (Value::NodeRef(_), DataType::NodeRef)
1097 | (Value::EdgeRef(_), DataType::EdgeRef)
1098 | (Value::VectorRef(_, _), DataType::VectorRef)
1099 | (Value::RowRef(_, _), DataType::RowRef)
1100 | (Value::Color(_), DataType::Color)
1101 | (Value::Email(_), DataType::Email)
1102 | (Value::Url(_), DataType::Url)
1103 | (Value::Phone(_), DataType::Phone)
1104 | (Value::Semver(_), DataType::Semver)
1105 | (Value::Cidr(_, _), DataType::Cidr)
1106 | (Value::Date(_), DataType::Date)
1107 | (Value::Time(_), DataType::Time)
1108 | (Value::Decimal(_), DataType::Decimal)
1109 | (Value::EnumValue(_), DataType::Enum)
1110 | (Value::Array(_), DataType::Array)
1111 | (Value::TimestampMs(_), DataType::TimestampMs)
1112 | (Value::Ipv4(_), DataType::Ipv4)
1113 | (Value::Ipv6(_), DataType::Ipv6)
1114 | (Value::Subnet(_, _), DataType::Subnet)
1115 | (Value::Port(_), DataType::Port)
1116 | (Value::Latitude(_), DataType::Latitude)
1117 | (Value::Longitude(_), DataType::Longitude)
1118 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1119 | (Value::Country2(_), DataType::Country2)
1120 | (Value::Country3(_), DataType::Country3)
1121 | (Value::Lang2(_), DataType::Lang2)
1122 | (Value::Lang5(_), DataType::Lang5)
1123 | (Value::Currency(_), DataType::Currency)
1124 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1125 | (Value::BigInt(_), DataType::BigInt)
1126 | (Value::KeyRef(_, _), DataType::KeyRef)
1127 | (Value::DocRef(_, _), DataType::DocRef)
1128 | (Value::TableRef(_), DataType::TableRef)
1129 | (Value::PageRef(_), DataType::PageRef)
1130 | (Value::Secret(_), DataType::Secret)
1131 | (Value::Password(_), DataType::Password)
1132 )
1133}
1134
1135fn value_to_coercion_input(value: &Value) -> Option<String> {
1136 match value {
1137 Value::Null => None,
1138 Value::Integer(value) => Some(value.to_string()),
1139 Value::UnsignedInteger(value) => Some(value.to_string()),
1140 Value::Float(value) => Some(value.to_string()),
1141 Value::Text(value) => Some(value.to_string()),
1142 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1143 Value::Boolean(value) => Some(value.to_string()),
1144 Value::Timestamp(value) => Some(value.to_string()),
1145 Value::Duration(value) => Some(value.to_string()),
1146 Value::IpAddr(value) => Some(value.to_string()),
1147 Value::MacAddr(value) => Some(format!(
1148 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1149 value[0], value[1], value[2], value[3], value[4], value[5]
1150 )),
1151 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1152 Value::Email(value) => Some(value.clone()),
1153 Value::Url(value) => Some(value.clone()),
1154 Value::Phone(value) => Some(value.to_string()),
1155 Value::Semver(value) => Some(format!(
1156 "{}.{}.{}",
1157 value / 1_000_000,
1158 (value / 1_000) % 1_000,
1159 value % 1_000
1160 )),
1161 Value::Date(value) => Some(value.to_string()),
1162 Value::Time(value) => Some(value.to_string()),
1163 Value::Decimal(value) => Some(value.to_string()),
1164 Value::TimestampMs(value) => Some(value.to_string()),
1165 Value::Ipv4(value) => Some(format!(
1166 "{}.{}.{}.{}",
1167 (value >> 24) & 0xFF,
1168 (value >> 16) & 0xFF,
1169 (value >> 8) & 0xFF,
1170 value & 0xFF
1171 )),
1172 Value::Port(value) => Some(value.to_string()),
1173 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1174 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1175 Value::GeoPoint(lat, lon) => Some(format!(
1176 "{},{}",
1177 *lat as f64 / 1_000_000.0,
1178 *lon as f64 / 1_000_000.0
1179 )),
1180 Value::BigInt(value) => Some(value.to_string()),
1181 Value::TableRef(value) => Some(value.clone()),
1182 Value::PageRef(value) => Some(value.to_string()),
1183 Value::Password(value) => Some(value.clone()),
1184 _ => None,
1185 }
1186}
1187
1188fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1189 if modified_columns.is_empty() {
1190 return modified_columns;
1191 }
1192
1193 let mut unique = Vec::with_capacity(modified_columns.len());
1194 for column in modified_columns.drain(..) {
1195 if !unique
1196 .iter()
1197 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1198 {
1199 unique.push(column);
1200 }
1201 }
1202 unique
1203}
1204
1205fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1206 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1207 return Err(crate::RedDBError::Query(
1208 "array positional document patch paths are unsupported; replace the array or full document body instead"
1209 .to_string(),
1210 ));
1211 }
1212 Ok(())
1213}
1214
1215fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1216 match fields.get("body") {
1217 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1218 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1219 }),
1220 Some(_) => Err(crate::RedDBError::Query(
1221 "document body field must contain JSON".to_string(),
1222 )),
1223 None => Ok(JsonValue::Object(Default::default())),
1224 }
1225}
1226
1227fn replace_document_row_body(
1228 fields: &mut HashMap<String, Value>,
1229 body: JsonValue,
1230 modified_columns: &mut Vec<String>,
1231) -> RedDBResult<()> {
1232 modified_columns.push("body".to_string());
1233
1234 let old_keys: Vec<String> = fields
1235 .keys()
1236 .filter(|key| key.as_str() != "body")
1237 .cloned()
1238 .collect();
1239 for key in old_keys {
1240 fields.remove(&key);
1241 modified_columns.push(key);
1242 }
1243
1244 let body_bytes = json_to_vec(&body).map_err(|err| {
1245 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1246 })?;
1247 fields.insert("body".to_string(), Value::Json(body_bytes));
1248
1249 if let JsonValue::Object(map) = &body {
1250 for (key, value) in map {
1251 fields.insert(key.clone(), json_to_storage_value(value)?);
1252 modified_columns.push(key.clone());
1253 }
1254 }
1255
1256 Ok(())
1257}
1258
1259impl RedDBRuntime {
1260 pub(crate) fn apply_loaded_patch_entity_core(
1261 &self,
1262 collection: String,
1263 mut entity: crate::storage::UnifiedEntity,
1264 payload: JsonValue,
1265 operations: Vec<PatchEntityOperation>,
1266 ) -> RedDBResult<AppliedEntityMutation> {
1267 let id = entity.id;
1268 let operations = normalize_ttl_patch_operations(operations)?;
1269 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1272
1273 let db = self.db();
1274 let store = db.store();
1275 let Some(manager) = store.get_collection(&collection) else {
1276 return Err(crate::RedDBError::NotFound(format!(
1277 "collection not found: {collection}"
1278 )));
1279 };
1280
1281 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1282 let mut metadata_changed = false;
1283 let mut modified_columns: Vec<String> = Vec::new();
1284 let mut context_index_dirty = false;
1285 let mut graph_node_type: Option<String> = None;
1286 let mut graph_edge_weight: Option<f32> = None;
1287
1288 let row_contract_timestamps = db
1289 .collection_contract(&collection)
1290 .map(|c| c.timestamps_enabled)
1291 .unwrap_or(false);
1292
1293 match &mut entity.data {
1294 crate::storage::EntityData::Row(row) => {
1295 let is_document_collection = db
1296 .collection_contract(&collection)
1297 .map(|contract| {
1298 contract.declared_model == crate::catalog::CollectionModel::Document
1299 })
1300 .unwrap_or(false);
1301 let mut field_ops = Vec::new();
1302 let mut metadata_ops = Vec::new();
1303 let mut document_body_ops = Vec::new();
1304
1305 for mut op in operations {
1306 let Some(root) = op.path.first().map(String::as_str) else {
1307 return Err(crate::RedDBError::Query(
1308 "patch path cannot be empty".to_string(),
1309 ));
1310 };
1311
1312 match root {
1313 "body" if is_document_collection => {
1314 if op.path.len() < 2 {
1315 return Err(crate::RedDBError::Query(
1316 "document body patch paths require a nested key; use payload.body for full replacement"
1317 .to_string(),
1318 ));
1319 }
1320 op.path.remove(0);
1321 reject_document_array_position_path(&op.path)?;
1322 document_body_ops.push(op);
1323 }
1324 "fields" | "named" => {
1325 if op.path.len() < 2 {
1326 return Err(crate::RedDBError::Query(
1327 "patch path 'fields' requires a nested key".to_string(),
1328 ));
1329 }
1330 if row_contract_timestamps {
1331 let leaf = op.path.get(1).map(String::as_str);
1332 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1333 return Err(crate::RedDBError::Query(format!(
1334 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1335 collection,
1336 leaf.unwrap_or("")
1337 )));
1338 }
1339 }
1340 op.path.remove(0);
1341 field_ops.push(op);
1342 }
1343 "metadata" => {
1344 if op.path.len() < 2 {
1345 return Err(crate::RedDBError::Query(
1346 "patch path 'metadata' requires a nested key".to_string(),
1347 ));
1348 }
1349 op.path.remove(0);
1350 metadata_ops.push(op);
1351 }
1352 _ => {
1353 return Err(crate::RedDBError::Query(format!(
1354 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1355 )));
1356 }
1357 }
1358 }
1359
1360 if !document_body_ops.is_empty() {
1361 context_index_dirty = true;
1362 let named = row.named.get_or_insert_with(Default::default);
1363 let mut body = document_body_from_named(named)?;
1364 apply_patch_operations_to_json(&mut body, &document_body_ops)
1365 .map_err(crate::RedDBError::Query)?;
1366 replace_document_row_body(named, body, &mut modified_columns)?;
1367 }
1368
1369 if is_document_collection {
1370 if let Some(body) = payload.get("body") {
1371 context_index_dirty = true;
1372 let named = row.named.get_or_insert_with(Default::default);
1373 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1374 }
1375 }
1376
1377 if !field_ops.is_empty() {
1378 context_index_dirty = true;
1379 for op in &field_ops {
1380 if let Some(col) = op.path.first() {
1381 modified_columns.push(col.clone());
1382 }
1383 }
1384 let named = row.named.get_or_insert_with(Default::default);
1385 apply_patch_operations_to_storage_map(named, &field_ops)?;
1386 }
1387
1388 if let Some(fields) = payload
1389 .get("fields")
1390 .and_then(crate::json::Value::as_object)
1391 {
1392 if row_contract_timestamps {
1393 for key in fields.keys() {
1394 if key == "created_at" || key == "updated_at" {
1395 return Err(crate::RedDBError::Query(format!(
1396 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1397 collection, key
1398 )));
1399 }
1400 }
1401 }
1402 context_index_dirty = true;
1403 let named = row.named.get_or_insert_with(Default::default);
1404 for (key, value) in fields {
1405 modified_columns.push(key.clone());
1406 named.insert(key.clone(), json_to_storage_value(value)?);
1407 }
1408 }
1409
1410 if !metadata_ops.is_empty() {
1411 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1412 let metadata = patch_metadata.get_or_insert_with(|| {
1413 store.get_metadata(&collection, id).unwrap_or_default()
1414 });
1415 let mut metadata_json = metadata_to_json(metadata);
1416 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1417 .map_err(crate::RedDBError::Query)?;
1418 *metadata = metadata_from_json(&metadata_json)?;
1419 metadata_changed = true;
1420 }
1421
1422 if !modified_columns.is_empty() || row_contract_timestamps {
1423 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1424 let current_fields = if let Some(named) = row.named.take() {
1425 named.into_iter().collect::<Vec<_>>()
1426 } else if let Some(schema) = row.schema.as_ref() {
1427 schema
1428 .iter()
1429 .cloned()
1430 .zip(row.columns.iter().cloned())
1431 .collect::<Vec<_>>()
1432 } else {
1433 Vec::new()
1434 };
1435 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1436 if row_contract_timestamps {
1437 modified_columns.push("updated_at".to_string());
1438 context_index_dirty = true;
1439 }
1440 if contract.requires_uniqueness_check(&modified_columns) {
1441 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1442 }
1443 row.named = Some(normalized_fields.into_iter().collect());
1444 }
1445 }
1446 crate::storage::EntityData::Node(node) => {
1447 let mut field_ops = Vec::new();
1448 let mut metadata_ops = Vec::new();
1449 let mut node_type_ops = Vec::new();
1450
1451 for mut op in operations {
1452 let Some(root) = op.path.first().map(String::as_str) else {
1453 return Err(crate::RedDBError::Query(
1454 "patch path cannot be empty".to_string(),
1455 ));
1456 };
1457
1458 match root {
1459 "fields" | "properties" => {
1460 if op.path.len() < 2 {
1461 return Err(crate::RedDBError::Query(
1462 "patch path 'fields' requires a nested key".to_string(),
1463 ));
1464 }
1465 op.path.remove(0);
1466 field_ops.push(op);
1467 }
1468 "metadata" => {
1469 if op.path.len() < 2 {
1470 return Err(crate::RedDBError::Query(
1471 "patch path 'metadata' requires a nested key".to_string(),
1472 ));
1473 }
1474 op.path.remove(0);
1475 metadata_ops.push(op);
1476 }
1477 "node_type" => {
1478 if op.path.len() != 1 {
1479 return Err(crate::RedDBError::Query(
1480 "patch path 'node_type' does not allow nested keys".to_string(),
1481 ));
1482 }
1483 op.path.clear();
1484 node_type_ops.push(op);
1485 }
1486 _ => {
1487 return Err(crate::RedDBError::Query(format!(
1488 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1489 )));
1490 }
1491 }
1492 }
1493
1494 for op in node_type_ops {
1495 context_index_dirty = true;
1496 let value = op.value.ok_or_else(|| {
1497 crate::RedDBError::Query("node_type operations require a value".to_string())
1498 })?;
1499
1500 match op.op {
1501 PatchEntityOperationType::Unset => {
1502 return Err(crate::RedDBError::Query(
1503 "node_type cannot be unset through patch operations".to_string(),
1504 ));
1505 }
1506 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1507 let Some(node_type) = value.as_str() else {
1508 return Err(crate::RedDBError::Query(
1509 "node_type operation requires a text value".to_string(),
1510 ));
1511 };
1512 graph_node_type = Some(node_type.to_string());
1513 modified_columns.push("node_type".to_string());
1514 }
1515 }
1516 }
1517
1518 if !field_ops.is_empty() {
1519 context_index_dirty = true;
1520 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1521 }
1522
1523 if let Some(fields) = payload
1524 .get("fields")
1525 .and_then(crate::json::Value::as_object)
1526 {
1527 context_index_dirty = true;
1528 for (key, value) in fields {
1529 node.properties
1530 .insert(key.clone(), json_to_storage_value(value)?);
1531 }
1532 }
1533
1534 if !metadata_ops.is_empty() {
1535 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1536 let metadata = patch_metadata.get_or_insert_with(|| {
1537 store.get_metadata(&collection, id).unwrap_or_default()
1538 });
1539 let mut metadata_json = metadata_to_json(metadata);
1540 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1541 .map_err(crate::RedDBError::Query)?;
1542 *metadata = metadata_from_json(&metadata_json)?;
1543 metadata_changed = true;
1544 }
1545 }
1546 crate::storage::EntityData::Edge(edge) => {
1547 let mut field_ops = Vec::new();
1548 let mut metadata_ops = Vec::new();
1549 let mut weight_ops = Vec::new();
1550
1551 for mut op in operations {
1552 let Some(root) = op.path.first().map(String::as_str) else {
1553 return Err(crate::RedDBError::Query(
1554 "patch path cannot be empty".to_string(),
1555 ));
1556 };
1557
1558 match root {
1559 "fields" | "properties" => {
1560 if op.path.len() < 2 {
1561 return Err(crate::RedDBError::Query(
1562 "patch path 'fields' requires a nested key".to_string(),
1563 ));
1564 }
1565 op.path.remove(0);
1566 field_ops.push(op);
1567 }
1568 "weight" => {
1569 if op.path.len() != 1 {
1570 return Err(crate::RedDBError::Query(
1571 "patch path 'weight' does not allow nested keys".to_string(),
1572 ));
1573 }
1574 op.path.clear();
1575 weight_ops.push(op);
1576 }
1577 "metadata" => {
1578 if op.path.len() < 2 {
1579 return Err(crate::RedDBError::Query(
1580 "patch path 'metadata' requires a nested key".to_string(),
1581 ));
1582 }
1583 op.path.remove(0);
1584 metadata_ops.push(op);
1585 }
1586 _ => {
1587 return Err(crate::RedDBError::Query(format!(
1588 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1589 )));
1590 }
1591 }
1592 }
1593
1594 if !field_ops.is_empty() {
1595 context_index_dirty = true;
1596 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1597 }
1598
1599 for op in weight_ops {
1600 context_index_dirty = true;
1601 let value = op.value.ok_or_else(|| {
1602 crate::RedDBError::Query("weight operations require a value".to_string())
1603 })?;
1604
1605 match op.op {
1606 PatchEntityOperationType::Unset => {
1607 return Err(crate::RedDBError::Query(
1608 "weight cannot be unset through patch operations".to_string(),
1609 ));
1610 }
1611 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1612 let Some(weight) = value.as_f64() else {
1613 return Err(crate::RedDBError::Query(
1614 "weight operation requires a numeric value".to_string(),
1615 ));
1616 };
1617 graph_edge_weight = Some(weight as f32);
1618 edge.weight = weight as f32;
1619 modified_columns.push("weight".to_string());
1620 }
1621 }
1622 }
1623
1624 if let Some(fields) = payload
1625 .get("fields")
1626 .and_then(crate::json::Value::as_object)
1627 {
1628 context_index_dirty = true;
1629 for (key, value) in fields {
1630 edge.properties
1631 .insert(key.clone(), json_to_storage_value(value)?);
1632 }
1633 }
1634
1635 if !metadata_ops.is_empty() {
1636 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1637 let metadata = patch_metadata.get_or_insert_with(|| {
1638 store.get_metadata(&collection, id).unwrap_or_default()
1639 });
1640 let mut metadata_json = metadata_to_json(metadata);
1641 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1642 .map_err(crate::RedDBError::Query)?;
1643 *metadata = metadata_from_json(&metadata_json)?;
1644 metadata_changed = true;
1645 }
1646 }
1647 crate::storage::EntityData::Vector(vector) => {
1648 let mut field_ops = Vec::new();
1649 let mut metadata_ops = Vec::new();
1650
1651 for mut op in operations {
1652 let Some(root) = op.path.first().map(String::as_str) else {
1653 return Err(crate::RedDBError::Query(
1654 "patch path cannot be empty".to_string(),
1655 ));
1656 };
1657
1658 match root {
1659 "fields" => {
1660 if op.path.len() < 2 {
1661 return Err(crate::RedDBError::Query(
1662 "patch path 'fields' requires a nested key".to_string(),
1663 ));
1664 }
1665 op.path.remove(0);
1666 let Some(target) = op.path.first().map(String::as_str) else {
1667 return Err(crate::RedDBError::Query(
1668 "patch path requires a target under fields".to_string(),
1669 ));
1670 };
1671 if !matches!(target, "dense" | "content" | "sparse") {
1672 return Err(crate::RedDBError::Query(format!(
1673 "unsupported vector patch target '{target}'"
1674 )));
1675 }
1676 field_ops.push(op);
1677 }
1678 "metadata" => {
1679 if op.path.len() < 2 {
1680 return Err(crate::RedDBError::Query(
1681 "patch path 'metadata' requires a nested key".to_string(),
1682 ));
1683 }
1684 op.path.remove(0);
1685 metadata_ops.push(op);
1686 }
1687 _ => {
1688 return Err(crate::RedDBError::Query(format!(
1689 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1690 )));
1691 }
1692 }
1693 }
1694
1695 if !field_ops.is_empty() {
1696 context_index_dirty = true;
1697 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1698 }
1699
1700 if let Some(fields) = payload
1701 .get("fields")
1702 .and_then(crate::json::Value::as_object)
1703 {
1704 context_index_dirty = true;
1705 if let Some(content) =
1706 fields.get("content").and_then(crate::json::Value::as_str)
1707 {
1708 vector.content = Some(content.to_string());
1709 }
1710 if let Some(dense) = fields.get("dense") {
1711 vector.dense = dense
1712 .as_array()
1713 .ok_or_else(|| {
1714 crate::RedDBError::Query(
1715 "field 'dense' must be an array".to_string(),
1716 )
1717 })?
1718 .iter()
1719 .map(|value| {
1720 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1721 crate::RedDBError::Query(
1722 "field 'dense' must contain only numbers".to_string(),
1723 )
1724 })
1725 })
1726 .collect::<Result<Vec<_>, _>>()?;
1727 }
1728 }
1729
1730 if !metadata_ops.is_empty() {
1731 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1732 let metadata = patch_metadata.get_or_insert_with(|| {
1733 store.get_metadata(&collection, id).unwrap_or_default()
1734 });
1735 let mut metadata_json = metadata_to_json(metadata);
1736 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1737 .map_err(crate::RedDBError::Query)?;
1738 *metadata = metadata_from_json(&metadata_json)?;
1739 metadata_changed = true;
1740 }
1741 }
1742 crate::storage::EntityData::TimeSeries(_)
1743 | crate::storage::EntityData::QueueMessage(_) => {
1744 return Err(crate::RedDBError::Query(
1745 "patch operations are not supported for TimeSeries or QueueMessage entities"
1746 .to_string(),
1747 ));
1748 }
1749 }
1750
1751 if let Some(node_type) = graph_node_type {
1752 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1753 node.node_type = node_type;
1754 }
1755 }
1756 if let Some(weight) = graph_edge_weight {
1757 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1758 edge.weight = (weight * 1000.0) as u32;
1759 }
1760 }
1761
1762 if let Some(metadata) = payload
1763 .get("metadata")
1764 .and_then(crate::json::Value::as_object)
1765 {
1766 let patch_metadata = patch_metadata
1767 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1768 for (key, value) in metadata {
1769 ensure_non_tree_reserved_metadata_key(key)?;
1770 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1771 }
1772 metadata_changed = true;
1773 }
1774
1775 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1776 let patch_metadata = patch_metadata
1777 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1778 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1779 patch_metadata.remove(&key);
1780 } else {
1781 patch_metadata.set(key, value);
1782 }
1783 metadata_changed = true;
1784 }
1785
1786 entity.updated_at = std::time::SystemTime::now()
1787 .duration_since(std::time::UNIX_EPOCH)
1788 .unwrap_or_default()
1789 .as_secs();
1790
1791 modified_columns = dedupe_modified_columns(modified_columns);
1792
1793 Ok(AppliedEntityMutation {
1794 id,
1795 collection,
1796 entity,
1797 metadata: patch_metadata,
1798 modified_columns,
1799 persist_metadata: metadata_changed,
1800 context_index_dirty,
1801 replaced_entity: None,
1802 replaced_entity_previous_xmax: 0,
1803 pre_mutation_fields,
1804 })
1805 }
1806
1807 pub(crate) fn apply_loaded_sql_update_row_core(
1808 &self,
1809 collection: String,
1810 mut entity: crate::storage::UnifiedEntity,
1811 static_field_assignments: &[(String, Value)],
1812 dynamic_field_assignments: Vec<(String, Value)>,
1813 static_metadata_assignments: &[(String, MetadataValue)],
1814 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1815 row_contract_plan: Option<&RowUpdateContractPlan>,
1816 row_modified_columns_template: &[String],
1817 row_touches_unique_columns: bool,
1818 ) -> RedDBResult<AppliedEntityMutation> {
1819 let id = entity.id;
1820 let previous_xmax = entity.xmax;
1821 let db = self.db();
1822 let store = db.store();
1823 let Some(_) = store.get_collection(&collection) else {
1824 return Err(crate::RedDBError::NotFound(format!(
1825 "collection not found: {collection}"
1826 )));
1827 };
1828
1829 let versioned_update_xid = match self.current_xid() {
1830 Some(xid) => Some(xid),
1831 None => {
1832 let snapshot_manager = self.snapshot_manager();
1833 let xid = snapshot_manager.begin();
1834 snapshot_manager.commit(xid);
1835 Some(xid)
1836 }
1837 };
1838 let mut replaced_entity = versioned_update_xid.map(|xid| {
1839 let mut old = entity.clone();
1840 old.set_xmax(xid);
1841 old
1842 });
1843
1844 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1845 let row_contract_timestamps = row_contract_plan
1846 .map(|plan| plan.timestamps_enabled)
1847 .unwrap_or(false);
1848 let mut metadata_changed = false;
1849 let mut modified_columns = row_modified_columns_template.to_vec();
1850 let mut context_index_dirty = !modified_columns.is_empty();
1851
1852 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1856
1857 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1858 return Err(crate::RedDBError::Query(
1859 "SQL row update fast path requires a row entity".to_string(),
1860 ));
1861 };
1862
1863 let _ = row_contract_plan;
1864 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1865 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1866
1867 for (key, value) in static_metadata_assignments
1868 .iter()
1869 .cloned()
1870 .chain(dynamic_metadata_assignments)
1871 {
1872 ensure_non_tree_reserved_metadata_key(&key)?;
1873 patch_metadata
1874 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1875 .set(key, value);
1876 metadata_changed = true;
1877 }
1878
1879 if !modified_columns.is_empty() || row_contract_timestamps {
1880 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1881 if row_contract_timestamps {
1882 context_index_dirty = true;
1883 set_row_field(
1884 row,
1885 "updated_at",
1886 Value::UnsignedInteger(current_unix_ms_u64()),
1887 );
1888 modified_columns.push("updated_at".to_string());
1889 }
1890 if row_touches_unique_columns {
1891 let current_fields = collect_row_fields(row);
1892 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1893 }
1894 }
1895
1896 entity.updated_at = std::time::SystemTime::now()
1897 .duration_since(std::time::UNIX_EPOCH)
1898 .unwrap_or_default()
1899 .as_secs();
1900
1901 if let Some(xid) = versioned_update_xid {
1902 let logical_id = entity.logical_id();
1903 entity.id = store.next_entity_id();
1904 entity.set_logical_id(logical_id);
1905 entity.set_xmin(xid);
1906 entity.set_xmax(0);
1907 if let Some(old) = replaced_entity.as_mut() {
1908 old.set_xmax(xid);
1909 }
1910 }
1911
1912 modified_columns = dedupe_modified_columns(modified_columns);
1913
1914 Ok(AppliedEntityMutation {
1915 id: entity.id,
1916 collection,
1917 entity,
1918 metadata: patch_metadata,
1919 modified_columns,
1920 persist_metadata: metadata_changed,
1921 context_index_dirty,
1922 replaced_entity,
1923 replaced_entity_previous_xmax: previous_xmax,
1924 pre_mutation_fields,
1925 })
1926 }
1927
1928 pub(crate) fn persist_applied_entity_mutations(
1929 &self,
1930 applied: &[AppliedEntityMutation],
1931 ) -> RedDBResult<()> {
1932 if applied.is_empty() {
1933 return Ok(());
1934 }
1935
1936 let store = self.db().store();
1937 let collection = &applied[0].collection;
1938 let Some(manager) = store.get_collection(collection) else {
1939 return Err(crate::RedDBError::NotFound(format!(
1940 "collection not found: {collection}"
1941 )));
1942 };
1943
1944 let mut ordinary = Vec::with_capacity(applied.len());
1945 for item in applied {
1946 if let Some(old_version) = item.replaced_entity.as_ref() {
1947 store
1948 .install_versioned_table_row_update(
1949 collection,
1950 old_version.clone(),
1951 item.entity.clone(),
1952 if item.persist_metadata {
1953 item.metadata.as_ref()
1954 } else {
1955 None
1956 },
1957 )
1958 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1959 if self.current_xid().is_some() {
1960 self.record_pending_versioned_update(
1961 crate::runtime::impl_core::current_connection_id(),
1962 collection,
1963 old_version.id,
1964 item.entity.id,
1965 old_version.xmax,
1966 item.replaced_entity_previous_xmax,
1967 );
1968 }
1969 } else {
1970 ordinary.push(item);
1971 }
1972 }
1973 if ordinary.is_empty() {
1974 return Ok(());
1975 }
1976
1977 manager
1978 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1979 (
1980 &item.entity,
1981 item.modified_columns.as_slice(),
1982 if item.persist_metadata {
1983 item.metadata.as_ref()
1984 } else {
1985 None
1986 },
1987 )
1988 }))
1989 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1990
1991 let indexed_cols = self
2000 .index_store_ref()
2001 .indexed_columns_set(collection.as_str());
2002 let all_hot = !indexed_cols.is_empty()
2003 && ordinary.iter().all(|item| {
2004 !item.persist_metadata
2005 && !item
2006 .modified_columns
2007 .iter()
2008 .any(|c| indexed_cols.contains(c))
2009 })
2010 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2011
2012 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2016 ordinary.iter().map(|item| &item.entity).collect();
2017 let persist_fn = if all_hot {
2018 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2019 } else {
2020 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2021 };
2022 persist_fn(store.as_ref(), collection, &entity_refs)
2023 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2024 }
2025
2026 pub(crate) fn flush_applied_entity_mutation(
2027 &self,
2028 applied: &AppliedEntityMutation,
2029 ) -> RedDBResult<()> {
2030 let store = self.db().store();
2031 if applied.context_index_dirty {
2032 store
2033 .context_index()
2034 .index_entity(&applied.collection, &applied.entity);
2035 }
2036 let mut changed_columns: Option<Vec<String>> = None;
2044 if !applied.pre_mutation_fields.is_empty() {
2045 let post = entity_row_fields_snapshot(&applied.entity);
2046 if !post.is_empty() {
2047 let damage = crate::application::entity::row_damage_vector(
2048 &applied.pre_mutation_fields,
2049 &post,
2050 );
2051 if !damage.is_empty() {
2052 changed_columns = Some(
2053 damage
2054 .touched_columns()
2055 .into_iter()
2056 .map(str::to_string)
2057 .collect(),
2058 );
2059 }
2060
2061 let indexed_cols: std::collections::HashSet<String> = self
2070 .index_store_ref()
2071 .list_indices(applied.collection.as_str())
2072 .into_iter()
2073 .filter_map(|idx| idx.columns.first().cloned())
2074 .collect();
2075 let modified_cols: std::collections::HashSet<String> = damage
2076 .touched_columns()
2077 .into_iter()
2078 .map(str::to_string)
2079 .collect();
2080 if let Some(old_version) = applied.replaced_entity.as_ref() {
2081 let old_index_fields: Vec<(String, Value)> = applied
2082 .pre_mutation_fields
2083 .iter()
2084 .filter(|(col, _)| indexed_cols.contains(col))
2085 .cloned()
2086 .collect();
2087 let new_index_fields: Vec<(String, Value)> = post
2088 .iter()
2089 .filter(|(col, _)| indexed_cols.contains(col))
2090 .cloned()
2091 .collect();
2092 if !old_index_fields.is_empty() {
2093 self.index_store_ref()
2094 .index_entity_delete(
2095 &applied.collection,
2096 old_version.id,
2097 &old_index_fields,
2098 )
2099 .map_err(crate::RedDBError::Internal)?;
2100 }
2101 if !new_index_fields.is_empty() {
2102 self.index_store_ref()
2103 .index_entity_insert(
2104 &applied.collection,
2105 applied.entity.id,
2106 &new_index_fields,
2107 )
2108 .map_err(crate::RedDBError::Internal)?;
2109 }
2110 } else {
2111 let decision = crate::storage::engine::hot_update::decide(
2112 &crate::storage::engine::hot_update::HotUpdateInputs {
2113 collection: applied.collection.as_str(),
2114 indexed_columns: &indexed_cols,
2115 modified_columns: &modified_cols,
2116 new_tuple_size: 0,
2120 page_free_space: usize::MAX,
2121 },
2122 );
2123 if !decision.can_hot {
2124 self.index_store_ref()
2125 .index_entity_update(
2126 &applied.collection,
2127 applied.id,
2128 &applied.pre_mutation_fields,
2129 &post,
2130 )
2131 .map_err(crate::RedDBError::Internal)?;
2132 } else {
2133 tracing::debug!(
2137 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2138 "hot_update fast-path: skipped index_entity_update"
2139 );
2140 }
2141 }
2142 }
2143 }
2144 self.cdc_emit_prebuilt_with_columns(
2145 crate::replication::cdc::ChangeOperation::Update,
2146 &applied.collection,
2147 &applied.entity,
2148 "entity",
2149 applied.metadata.as_ref(),
2150 true,
2151 changed_columns,
2152 );
2153 Ok(())
2154 }
2155
2156 pub(crate) fn apply_loaded_patch_entity(
2157 &self,
2158 collection: String,
2159 entity: crate::storage::UnifiedEntity,
2160 payload: JsonValue,
2161 operations: Vec<PatchEntityOperation>,
2162 ) -> RedDBResult<CreateEntityOutput> {
2163 let applied =
2164 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2165 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2166 self.flush_applied_entity_mutation(&applied)?;
2167 Ok(CreateEntityOutput {
2168 id: applied.id,
2169 entity: Some(applied.entity),
2170 })
2171 }
2172}
2173
2174fn ensure_non_tree_reserved_metadata_patch_paths(
2175 operations: &[PatchEntityOperation],
2176) -> RedDBResult<()> {
2177 for operation in operations {
2178 let Some(key) = operation.path.first().map(String::as_str) else {
2179 continue;
2180 };
2181 ensure_non_tree_reserved_metadata_key(key)?;
2182 }
2183 Ok(())
2184}
2185
2186fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2187 if key.starts_with(TREE_METADATA_PREFIX) {
2188 return Err(crate::RedDBError::Query(format!(
2189 "metadata key '{}' is reserved for managed trees",
2190 key
2191 )));
2192 }
2193 Ok(())
2194}
2195
2196fn ensure_non_tree_reserved_metadata_entries(
2197 metadata: &[(String, MetadataValue)],
2198) -> RedDBResult<()> {
2199 for (key, _) in metadata {
2200 ensure_non_tree_reserved_metadata_key(key)?;
2201 }
2202 Ok(())
2203}
2204
2205fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2206 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2207 return Err(crate::RedDBError::Query(format!(
2208 "edge label '{}' is reserved for managed trees",
2209 TREE_CHILD_EDGE_LABEL
2210 )));
2211 }
2212 Ok(())
2213}
2214
2215impl RedDBRuntime {
2216 pub(crate) fn create_node_unchecked(
2217 &self,
2218 input: CreateNodeInput,
2219 ) -> RedDBResult<CreateEntityOutput> {
2220 let db = self.db();
2221 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2222 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2223 let mut metadata = input.metadata;
2224 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2225 let mut builder = db.node(&input.collection, &input.label);
2226
2227 if let Some(node_type) = input.node_type {
2228 builder = builder.node_type(node_type);
2229 }
2230
2231 for (key, value) in input.properties {
2232 builder = builder.property(key, value);
2233 }
2234
2235 for (key, value) in metadata {
2236 builder = builder.metadata(key, value);
2237 }
2238
2239 for embedding in input.embeddings {
2240 if let Some(model) = embedding.model {
2241 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2242 } else {
2243 builder = builder.embedding(embedding.name, embedding.vector);
2244 }
2245 }
2246
2247 for link in input.table_links {
2248 builder = builder.link_to_table(link.key, link.table);
2249 }
2250
2251 for link in input.node_links {
2252 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2253 }
2254
2255 let id = builder.save()?;
2256 self.stamp_xmin_if_in_txn(&input.collection, id);
2259 refresh_context_index(&db, &input.collection, id)?;
2260 self.cdc_emit(
2261 crate::replication::cdc::ChangeOperation::Insert,
2262 &input.collection,
2263 id.raw(),
2264 "graph_node",
2265 );
2266 Ok(CreateEntityOutput {
2267 id,
2268 entity: db.store().get(&input.collection, id),
2269 })
2270 }
2271
2272 pub(crate) fn create_edge_unchecked(
2273 &self,
2274 input: CreateEdgeInput,
2275 ) -> RedDBResult<CreateEntityOutput> {
2276 let db = self.db();
2277 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2278 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2279 let mut metadata = input.metadata;
2280 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2281 let mut builder = db
2282 .edge(&input.collection, &input.label)
2283 .from(input.from)
2284 .to(input.to);
2285
2286 if let Some(weight) = input.weight {
2287 builder = builder.weight(weight);
2288 }
2289
2290 for (key, value) in input.properties {
2291 builder = builder.property(key, value);
2292 }
2293
2294 for (key, value) in metadata {
2295 builder = builder.metadata(key, value);
2296 }
2297
2298 let id = builder.save()?;
2299 self.stamp_xmin_if_in_txn(&input.collection, id);
2302 refresh_context_index(&db, &input.collection, id)?;
2303 self.cdc_emit(
2304 crate::replication::cdc::ChangeOperation::Insert,
2305 &input.collection,
2306 id.raw(),
2307 "graph_edge",
2308 );
2309 Ok(CreateEntityOutput {
2310 id,
2311 entity: db.store().get(&input.collection, id),
2312 })
2313 }
2314}
2315
2316fn create_rows_batch_prevalidated_columnar_with_outputs(
2317 runtime: &RedDBRuntime,
2318 collection: String,
2319 column_names: std::sync::Arc<Vec<String>>,
2320 rows: Vec<Vec<crate::storage::schema::Value>>,
2321) -> RedDBResult<Vec<CreateEntityOutput>> {
2322 use crate::storage::{
2323 unified::{EntityData, EntityKind, RowData},
2324 EntityId, UnifiedEntity,
2325 };
2326 use std::sync::Arc;
2327
2328 if rows.is_empty() {
2329 return Ok(Vec::new());
2330 }
2331 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2332 runtime.check_batch_size(rows.len())?;
2333 runtime.check_db_size()?;
2334
2335 let db = runtime.db();
2336 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2337 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2338
2339 let store = db.store();
2340 let table_arc: Arc<str> = Arc::from(collection.as_str());
2341
2342 let indexed_cols = runtime
2343 .index_store_ref()
2344 .indexed_columns_set(collection.as_str());
2345 let has_secondary_indexes = !indexed_cols.is_empty();
2346 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2347 if has_secondary_indexes {
2348 Vec::with_capacity(rows.len())
2349 } else {
2350 Vec::new()
2351 };
2352
2353 let entities: Vec<UnifiedEntity> = rows
2354 .into_iter()
2355 .map(|values| {
2356 if has_secondary_indexes {
2357 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2358 Vec::with_capacity(indexed_cols.len());
2359 for (name, val) in column_names.iter().zip(values.iter()) {
2360 if indexed_cols.contains(name) {
2361 snap.push((name.clone(), val.clone()));
2362 }
2363 }
2364 field_snapshots.push(snap);
2365 }
2366 let mut row = RowData::new(values);
2367 row.schema = Some(Arc::clone(&column_names));
2368 UnifiedEntity::new(
2369 EntityId::new(0),
2370 EntityKind::TableRow {
2371 table: Arc::clone(&table_arc),
2372 row_id: 0,
2373 },
2374 EntityData::Row(row),
2375 )
2376 })
2377 .collect();
2378
2379 let ids = store
2380 .bulk_insert(&collection, entities)
2381 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2382
2383 if has_secondary_indexes {
2384 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2385 .iter()
2386 .zip(field_snapshots)
2387 .map(|(id, fields)| (*id, fields))
2388 .collect();
2389 runtime
2390 .index_store_ref()
2391 .index_entity_insert_batch(&collection, &index_rows)
2392 .map_err(crate::RedDBError::Internal)?;
2393 }
2394
2395 runtime.invalidate_result_cache();
2396 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2397
2398 Ok(ids
2399 .into_iter()
2400 .map(|id| CreateEntityOutput { id, entity: None })
2401 .collect())
2402}
2403
2404impl RuntimeEntityPort for RedDBRuntime {
2405 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2406 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2407 let db = self.db();
2408 let CreateRowInput {
2409 collection,
2410 fields,
2411 metadata: input_metadata,
2412 node_links,
2413 vector_links,
2414 } = input;
2415 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2416 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2417 let mut metadata = input_metadata;
2418 apply_collection_default_ttl(&db, &collection, &mut metadata);
2419 let fields = contract.normalize_insert_fields(fields)?;
2420 contract.enforce_row_uniqueness(&fields, None)?;
2421 let engine = self.mutation_engine();
2423 let result = engine.apply(
2424 collection.clone(),
2425 vec![crate::runtime::mutation::MutationRow {
2426 fields,
2427 metadata,
2428 node_links,
2429 vector_links,
2430 }],
2431 )?;
2432 let id = result.ids[0];
2433 Ok(CreateEntityOutput {
2438 id,
2439 entity: db.store().get(&collection, id),
2440 })
2441 }
2442
2443 fn create_rows_batch(
2444 &self,
2445 input: CreateRowsBatchInput,
2446 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2447 if input.rows.is_empty() {
2448 return Ok(Vec::new());
2449 }
2450 self.check_batch_size(input.rows.len())?;
2451 self.check_db_size()?;
2452
2453 let db = self.db();
2454 let collection = input.collection;
2455 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2456 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2457
2458 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2459 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2460 for row in input.rows {
2461 if row.collection != collection {
2462 return Err(crate::RedDBError::Query(format!(
2463 "batch row collection mismatch: expected '{}', got '{}'",
2464 collection, row.collection
2465 )));
2466 }
2467
2468 let mut metadata = row.metadata;
2469 apply_collection_default_ttl(&db, &collection, &mut metadata);
2470 let fields = contract.normalize_insert_fields(row.fields)?;
2471 contract.enforce_row_uniqueness(&fields, None)?;
2472 uniqueness_rows.push(fields.clone());
2473 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2474 }
2475
2476 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2477
2478 let engine = {
2481 let e = self.mutation_engine();
2482 if input.suppress_events {
2483 e.with_suppress_events()
2484 } else {
2485 e
2486 }
2487 };
2488 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2489 .into_iter()
2490 .map(|(fields, metadata, node_links, vector_links)| {
2491 crate::runtime::mutation::MutationRow {
2492 fields,
2493 metadata,
2494 node_links,
2495 vector_links,
2496 }
2497 })
2498 .collect();
2499
2500 let result = engine
2501 .apply(collection.clone(), mutation_rows)
2502 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2503
2504 let store = db.store();
2505 Ok(result
2506 .ids
2507 .into_iter()
2508 .map(|id| CreateEntityOutput {
2509 id,
2510 entity: store.get(&collection, id),
2511 })
2512 .collect())
2513 }
2514
2515 fn create_rows_batch_prevalidated_columnar(
2516 &self,
2517 collection: String,
2518 column_names: std::sync::Arc<Vec<String>>,
2519 rows: Vec<Vec<crate::storage::schema::Value>>,
2520 ) -> RedDBResult<usize> {
2521 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2522 .map(|outputs| outputs.len())
2523 }
2524
2525 fn create_rows_batch_columnar(
2526 &self,
2527 collection: String,
2528 column_names: std::sync::Arc<Vec<String>>,
2529 rows: Vec<Vec<crate::storage::schema::Value>>,
2530 ) -> RedDBResult<usize> {
2531 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2532 .map(|outputs| outputs.len())
2533 }
2534
2535 fn create_rows_batch_columnar_with_outputs(
2536 &self,
2537 collection: String,
2538 column_names: std::sync::Arc<Vec<String>>,
2539 rows: Vec<Vec<crate::storage::schema::Value>>,
2540 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2541 if rows.is_empty() {
2542 return Ok(Vec::new());
2543 }
2544 self.check_batch_size(rows.len())?;
2545 self.check_db_size()?;
2546
2547 let db = self.db();
2548 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2549 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2550
2551 let needs_normalisation = match db.collection_contract(&collection) {
2561 Some(c) => {
2562 c.declared_model == crate::catalog::CollectionModel::Table
2563 && (!c.declared_columns.is_empty()
2564 || c.table_def
2565 .as_ref()
2566 .map(|t| !t.columns.is_empty())
2567 .unwrap_or(false))
2568 }
2569 None => false,
2570 };
2571 if !needs_normalisation {
2572 return create_rows_batch_prevalidated_columnar_with_outputs(
2573 self,
2574 collection,
2575 column_names,
2576 rows,
2577 );
2578 }
2579
2580 let ncols = column_names.len();
2587 let tuple_rows: Vec<CreateRowInput> = rows
2588 .into_iter()
2589 .map(|values| {
2590 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2591 Vec::with_capacity(ncols);
2592 for (name, value) in column_names.iter().zip(values) {
2593 fields.push((name.clone(), value));
2594 }
2595 CreateRowInput {
2596 collection: collection.clone(),
2597 fields,
2598 metadata: Vec::new(),
2599 node_links: Vec::new(),
2600 vector_links: Vec::new(),
2601 }
2602 })
2603 .collect();
2604 self.create_rows_batch(CreateRowsBatchInput {
2605 collection,
2606 rows: tuple_rows,
2607 suppress_events: false,
2608 })
2609 }
2610
2611 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2612 if input.rows.is_empty() {
2613 return Ok(0);
2614 }
2615 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2616 self.check_batch_size(input.rows.len())?;
2617 self.check_db_size()?;
2618
2619 let db = self.db();
2620 let collection = input.collection;
2621 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2626 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2627
2628 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2634
2635 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2636 Vec::with_capacity(input.rows.len());
2637 let mut mutation_rows = mutation_rows;
2638 for row in input.rows {
2639 if row.collection != collection {
2640 return Err(crate::RedDBError::Query(format!(
2641 "batch row collection mismatch: expected '{}', got '{}'",
2642 collection, row.collection
2643 )));
2644 }
2645 let mut metadata = row.metadata;
2646 if let Some(ttl) = default_ttl_ms {
2647 if !has_internal_ttl_metadata(&metadata) {
2648 metadata.push((
2649 "_ttl_ms".to_string(),
2650 if ttl <= i64::MAX as u64 {
2651 MetadataValue::Int(ttl as i64)
2652 } else {
2653 MetadataValue::Timestamp(ttl)
2654 },
2655 ));
2656 }
2657 }
2658 mutation_rows.push(crate::runtime::mutation::MutationRow {
2659 fields: row.fields,
2660 metadata,
2661 node_links: row.node_links,
2662 vector_links: row.vector_links,
2663 });
2664 }
2665
2666 let engine = self.mutation_engine();
2667 let result = engine
2668 .apply(collection, mutation_rows)
2669 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2670 Ok(result.ids.len())
2671 }
2672
2673 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2674 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2675 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2676 input.properties.iter().map(|(key, _)| key.as_str()),
2677 &format!("node '{}'", input.collection),
2678 )?;
2679 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2680 self.create_node_unchecked(input)
2681 }
2682
2683 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2684 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2685 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2686 input.properties.iter().map(|(key, _)| key.as_str()),
2687 &format!("edge '{}'", input.collection),
2688 )?;
2689 ensure_non_tree_structural_edge_label(&input.label)?;
2690 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2691 self.create_edge_unchecked(input)
2692 }
2693
2694 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2695 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2696 let db = self.db();
2697 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2698 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2699 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2700 let mut metadata = input.metadata;
2701 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2702 let mut builder = db.vector(&input.collection).dense(input.dense);
2703
2704 if let Some(content) = input.content {
2705 builder = builder.content(content);
2706 }
2707
2708 for (key, value) in metadata {
2709 builder = builder.metadata(key, value);
2710 }
2711
2712 if let Some(link_row) = input.link_row {
2713 builder = builder.link_to_table(link_row);
2714 }
2715
2716 if let Some(link_node) = input.link_node {
2717 builder = builder.link_to_node(link_node);
2718 }
2719
2720 let id = builder.save()?;
2721 self.stamp_xmin_if_in_txn(&input.collection, id);
2724 refresh_context_index(&db, &input.collection, id)?;
2725 self.cdc_emit(
2726 crate::replication::cdc::ChangeOperation::Insert,
2727 &input.collection,
2728 id.raw(),
2729 "vector",
2730 );
2731 Ok(CreateEntityOutput {
2732 id,
2733 entity: db.store().get(&input.collection, id),
2734 })
2735 }
2736
2737 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2738 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2739 let db = self.db();
2740 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2741 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2742
2743 if let JsonValue::Object(ref map) = input.body {
2744 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2745 map.keys().map(String::as_str),
2746 &format!("document '{}'", input.collection),
2747 )?;
2748 }
2749
2750 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2752 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2753 })?;
2754 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2755 "body".to_string(),
2756 crate::storage::schema::Value::Json(body_bytes),
2757 )];
2758
2759 if let JsonValue::Object(ref map) = input.body {
2761 for (key, value) in map {
2762 let storage_value = json_to_storage_value(value)?;
2763 fields.push((key.clone(), storage_value));
2764 }
2765 }
2766
2767 let mut metadata = input.metadata;
2768 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2769 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2770 .iter()
2771 .map(|(key, value)| (key.as_str(), value.clone()))
2772 .collect();
2773 let mut builder = db.row(&input.collection, columns);
2774
2775 for (key, value) in metadata {
2776 builder = builder.metadata(key, value);
2777 }
2778
2779 for node in input.node_links {
2780 builder = builder.link_to_node(node);
2781 }
2782
2783 for vector in input.vector_links {
2784 builder = builder.link_to_vector(vector);
2785 }
2786
2787 let id = builder.save()?;
2788 self.stamp_xmin_if_in_txn(&input.collection, id);
2790 refresh_context_index(&db, &input.collection, id)?;
2791 self.cdc_emit(
2792 crate::replication::cdc::ChangeOperation::Insert,
2793 &input.collection,
2794 id.raw(),
2795 "document",
2796 );
2797 Ok(CreateEntityOutput {
2798 id,
2799 entity: db.store().get(&input.collection, id),
2800 })
2801 }
2802
2803 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2804 let db = self.db();
2805 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2806 let declared_model = db
2807 .collection_contract(&input.collection)
2808 .map(|contract| contract.declared_model);
2809 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2810 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2811 self.seal_vault_value(&input.collection, input.value)?
2812 } else {
2813 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2814 input.value
2815 };
2816 let fields = vec![
2817 (
2818 "key".to_string(),
2819 crate::storage::schema::Value::text(input.key),
2820 ),
2821 ("value".to_string(), value),
2822 ];
2823 let collection = input.collection;
2824 let result = self.mutation_engine().apply(
2825 collection.clone(),
2826 vec![crate::runtime::mutation::MutationRow {
2827 fields,
2828 metadata: input.metadata,
2829 node_links: Vec::new(),
2830 vector_links: Vec::new(),
2831 }],
2832 )?;
2833 let id = result.ids[0];
2834 Ok(CreateEntityOutput {
2835 id,
2836 entity: db.store().get(&collection, id),
2837 })
2838 }
2839
2840 fn create_timeseries_point(
2841 &self,
2842 input: CreateTimeSeriesPointInput,
2843 ) -> RedDBResult<CreateEntityOutput> {
2844 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2845 let db = self.db();
2846 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2847 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2848
2849 let mut fields = vec![
2850 (
2851 "metric".to_string(),
2852 crate::storage::schema::Value::text(input.metric),
2853 ),
2854 (
2855 "value".to_string(),
2856 crate::storage::schema::Value::Float(input.value),
2857 ),
2858 ];
2859
2860 if let Some(timestamp_ns) = input.timestamp_ns {
2861 fields.push((
2862 "timestamp".to_string(),
2863 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2864 ));
2865 }
2866
2867 if !input.tags.is_empty() {
2868 let tags_json = JsonValue::Object(
2869 input
2870 .tags
2871 .into_iter()
2872 .map(|(key, value)| (key, JsonValue::String(value)))
2873 .collect(),
2874 );
2875 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2876 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2877 })?;
2878 fields.push((
2879 "tags".to_string(),
2880 crate::storage::schema::Value::Json(tags_bytes),
2881 ));
2882 }
2883
2884 let collection = input.collection;
2885 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2886 self.stamp_xmin_if_in_txn(&collection, id);
2889 refresh_context_index(&db, &collection, id)?;
2890
2891 Ok(CreateEntityOutput {
2892 id,
2893 entity: db.store().get(&collection, id),
2894 })
2895 }
2896
2897 fn get_kv(
2898 &self,
2899 collection: &str,
2900 key: &str,
2901 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2902 let db = self.db();
2903 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2904 let store = db.store();
2905 let Some(manager) = store.get_collection(collection) else {
2906 return Ok(None);
2907 };
2908 let entities = manager.query_all(|_| true);
2909 for entity in entities {
2910 if let crate::storage::EntityData::Row(ref row) = entity.data {
2911 if let Some(ref named) = row.named {
2912 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2913 if &**k == key {
2914 let value = named
2915 .get("value")
2916 .cloned()
2917 .unwrap_or(crate::storage::schema::Value::Null);
2918 return Ok(Some((value, entity.id)));
2919 }
2920 }
2921 }
2922 }
2923 }
2924 Ok(None)
2925 }
2926
2927 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2928 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2929 let found = self.get_kv(collection, key)?;
2930 if let Some((_, id)) = found {
2931 let db = self.db();
2932 let store = db.store();
2933 let deleted = store
2934 .delete(collection, id)
2935 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2936 if deleted {
2937 store.context_index().remove_entity(id);
2938 }
2939 Ok(deleted)
2940 } else {
2941 Ok(false)
2942 }
2943 }
2944
2945 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2946 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2947 let PatchEntityInput {
2948 collection,
2949 id,
2950 payload,
2951 operations,
2952 } = input;
2953 let db = self.db();
2954 let store = db.store();
2955 let Some(manager) = store.get_collection(&collection) else {
2956 return Err(crate::RedDBError::NotFound(format!(
2957 "collection not found: {collection}"
2958 )));
2959 };
2960 let Some(entity) = manager.get(id) else {
2961 return Err(crate::RedDBError::NotFound(format!(
2962 "entity not found: {}",
2963 id.raw()
2964 )));
2965 };
2966 self.apply_loaded_patch_entity(collection, entity, payload, operations)
2967 }
2968
2969 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
2970 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2971 let store = self.db().store();
2972 let pre_delete_fields = store
2976 .get(&input.collection, input.id)
2977 .as_ref()
2978 .map(entity_row_fields_snapshot)
2979 .unwrap_or_default();
2980 let deleted = store
2984 .delete(&input.collection, input.id)
2985 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2986 if deleted {
2987 store.context_index().remove_entity(input.id);
2988 if !pre_delete_fields.is_empty() {
2991 self.index_store_ref()
2992 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
2993 .map_err(crate::RedDBError::Internal)?;
2994 }
2995 self.cdc_emit(
2996 crate::replication::cdc::ChangeOperation::Delete,
2997 &input.collection,
2998 input.id.raw(),
2999 "entity",
3000 );
3001 }
3002 Ok(DeleteEntityOutput {
3003 deleted,
3004 id: input.id,
3005 })
3006 }
3007}