1use std::collections::HashMap;
2
3use crate::application::entity::{
4 AppliedEntityMutation, CreateDocumentInput, CreateKvInput, CreateTimeSeriesPointInput,
5 RowUpdateColumnRule, RowUpdateContractPlan,
6};
7use crate::application::ttl_payload::{
8 has_internal_ttl_metadata, normalize_ttl_patch_operations, parse_top_level_ttl_metadata_entries,
9};
10use crate::json::{to_vec as json_to_vec, Value as JsonValue};
11use crate::storage::query::resolve_declared_data_type;
12use crate::storage::schema::{coerce as coerce_schema_value, DataType, Value};
13use crate::storage::unified::MetadataValue;
14
15use super::*;
16
17const TREE_METADATA_PREFIX: &str = "red.tree.";
18const TREE_CHILD_EDGE_LABEL: &str = "TREE_CHILD";
19
20fn apply_collection_default_ttl(
21 db: &crate::storage::unified::devx::RedDB,
22 collection: &str,
23 metadata: &mut Vec<(String, MetadataValue)>,
24) {
25 if has_internal_ttl_metadata(metadata) {
26 return;
27 }
28
29 let Some(default_ttl_ms) = db.collection_default_ttl_ms(collection) else {
30 return;
31 };
32
33 metadata.push((
34 "_ttl_ms".to_string(),
35 if default_ttl_ms <= i64::MAX as u64 {
36 MetadataValue::Int(default_ttl_ms as i64)
37 } else {
38 MetadataValue::Timestamp(default_ttl_ms)
39 },
40 ));
41}
42
43fn refresh_context_index(
44 db: &crate::storage::unified::devx::RedDB,
45 collection: &str,
46 id: crate::storage::EntityId,
47) -> RedDBResult<()> {
48 let store = db.store();
49 let Some(entity) = store.get(collection, id) else {
50 return Ok(());
51 };
52
53 store.context_index().index_entity(collection, &entity);
54 Ok(())
55}
56
57pub(crate) fn entity_row_fields_snapshot(
63 entity: &crate::storage::UnifiedEntity,
64) -> Vec<(String, Value)> {
65 let crate::storage::EntityData::Row(row) = &entity.data else {
66 return Vec::new();
67 };
68 if let Some(named) = &row.named {
69 return named.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
70 }
71 if let Some(schema) = &row.schema {
72 return schema
73 .iter()
74 .zip(row.columns.iter())
75 .map(|(name, value)| (name.clone(), value.clone()))
76 .collect();
77 }
78 Vec::new()
79}
80
81fn ensure_collection_model_contract(
82 db: &crate::storage::unified::devx::RedDB,
83 collection: &str,
84 requested_model: crate::catalog::CollectionModel,
85) -> RedDBResult<()> {
86 if let Some(contract) = db.collection_contract(collection) {
87 if !contract_enforces_model(&contract) {
88 return Ok(());
89 }
90 if collection_model_allows(contract.declared_model, requested_model) {
91 return Ok(());
92 }
93 return Err(crate::RedDBError::InvalidOperation(format!(
94 "collection '{}' is declared as '{}' and does not allow '{}' writes",
95 collection,
96 collection_model_name(contract.declared_model),
97 collection_model_name(requested_model)
98 )));
99 }
100
101 let now = implicit_contract_unix_ms();
102 db.save_collection_contract(crate::physical::CollectionContract {
103 name: collection.to_string(),
104 declared_model: requested_model,
105 schema_mode: crate::catalog::SchemaMode::Dynamic,
106 origin: crate::physical::ContractOrigin::Implicit,
107 version: 1,
108 created_at_unix_ms: now,
109 updated_at_unix_ms: now,
110 default_ttl_ms: db.collection_default_ttl_ms(collection),
111 vector_dimension: None,
112 vector_metric: None,
113 context_index_fields: Vec::new(),
114 declared_columns: Vec::new(),
115 table_def: matches!(requested_model, crate::catalog::CollectionModel::Table)
116 .then(|| crate::storage::schema::TableDef::new(collection.to_string())),
117 timestamps_enabled: false,
118 context_index_enabled: false,
119 metrics_raw_retention_ms: None,
120 metrics_rollup_policies: Vec::new(),
121 metrics_tenant_identity: None,
122 metrics_namespace: None,
123 append_only: false,
126 subscriptions: Vec::new(),
127 analytics_config: Vec::new(),
128 session_key: None,
129 session_gap_ms: None,
130 retention_duration_ms: None,
131 })
132 .map(|_| ())
133 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
134}
135
136fn implicit_contract_unix_ms() -> u128 {
137 std::time::SystemTime::now()
138 .duration_since(std::time::UNIX_EPOCH)
139 .unwrap_or_default()
140 .as_millis()
141}
142
143fn collection_model_allows(
144 declared_model: crate::catalog::CollectionModel,
145 requested_model: crate::catalog::CollectionModel,
146) -> bool {
147 declared_model == requested_model || declared_model == crate::catalog::CollectionModel::Mixed
148}
149
150fn ensure_vector_dimension_contract(
151 db: &crate::storage::unified::devx::RedDB,
152 collection: &str,
153 actual_dimension: usize,
154) -> RedDBResult<()> {
155 let Some(expected_dimension) = db
156 .collection_contract(collection)
157 .and_then(|contract| contract.vector_dimension)
158 else {
159 return Ok(());
160 };
161 if expected_dimension == actual_dimension {
162 return Ok(());
163 }
164 Err(crate::RedDBError::Query(format!(
165 "vector dimension mismatch for collection '{collection}': expected {expected_dimension}, got {actual_dimension}"
166 )))
167}
168
169fn collection_model_name(model: crate::catalog::CollectionModel) -> &'static str {
170 match model {
171 crate::catalog::CollectionModel::Table => "table",
172 crate::catalog::CollectionModel::Document => "document",
173 crate::catalog::CollectionModel::Graph => "graph",
174 crate::catalog::CollectionModel::Vector => "vector",
175 crate::catalog::CollectionModel::Hll => "hll",
176 crate::catalog::CollectionModel::Sketch => "sketch",
177 crate::catalog::CollectionModel::Filter => "filter",
178 crate::catalog::CollectionModel::Kv => "kv",
179 crate::catalog::CollectionModel::Config => "config",
180 crate::catalog::CollectionModel::Vault => "vault",
181 crate::catalog::CollectionModel::Mixed => "mixed",
182 crate::catalog::CollectionModel::TimeSeries => "timeseries",
183 crate::catalog::CollectionModel::Queue => "queue",
184 crate::catalog::CollectionModel::Metrics => "metrics",
185 }
186}
187
188#[derive(Clone)]
189struct UniquenessRule {
190 name: String,
191 columns: Vec<String>,
192 primary_key: bool,
193}
194
195#[derive(Copy, Clone, PartialEq, Eq)]
196enum NormalizeMode {
197 Insert,
201 Update,
205}
206
207mod collection_contract_enforcement {
208 use super::*;
209
210 pub(super) struct CollectionContractWriteEnforcer<'a> {
211 db: &'a crate::storage::unified::devx::RedDB,
212 collection: &'a str,
213 }
214
215 impl<'a> CollectionContractWriteEnforcer<'a> {
216 pub(super) fn new(
217 db: &'a crate::storage::unified::devx::RedDB,
218 collection: &'a str,
219 ) -> Self {
220 Self { db, collection }
221 }
222
223 pub(super) fn ensure_model(
224 &self,
225 requested_model: crate::catalog::CollectionModel,
226 ) -> RedDBResult<()> {
227 ensure_collection_model_contract(self.db, self.collection, requested_model)
228 }
229
230 pub(super) fn normalize_insert_fields(
231 &self,
232 fields: Vec<(String, Value)>,
233 ) -> RedDBResult<Vec<(String, Value)>> {
234 normalize_row_fields_for_contract_with_mode(
235 self.db,
236 self.collection,
237 fields,
238 NormalizeMode::Insert,
239 )
240 }
241
242 pub(super) fn normalize_update_fields(
243 &self,
244 fields: Vec<(String, Value)>,
245 ) -> RedDBResult<Vec<(String, Value)>> {
246 normalize_row_fields_for_contract_with_mode(
247 self.db,
248 self.collection,
249 fields,
250 NormalizeMode::Update,
251 )
252 }
253
254 pub(super) fn enforce_row_uniqueness(
255 &self,
256 fields: &[(String, Value)],
257 exclude_id: Option<crate::storage::EntityId>,
258 ) -> RedDBResult<()> {
259 enforce_row_uniqueness(self.db, self.collection, fields, exclude_id)
260 }
261
262 pub(super) fn enforce_batch_uniqueness(
263 &self,
264 rows: &[Vec<(String, Value)>],
265 ) -> RedDBResult<()> {
266 enforce_row_batch_uniqueness(self.db, self.collection, rows)
267 }
268
269 pub(super) fn requires_uniqueness_check(&self, modified_columns: &[String]) -> bool {
270 row_update_requires_uniqueness_check(self.db, self.collection, modified_columns)
271 }
272 }
273}
274
275use collection_contract_enforcement::CollectionContractWriteEnforcer;
276
277fn normalize_row_fields_for_contract_with_mode(
278 db: &crate::storage::unified::devx::RedDB,
279 collection: &str,
280 fields: Vec<(String, Value)>,
281 mode: NormalizeMode,
282) -> RedDBResult<Vec<(String, Value)>> {
283 let Some(contract) = db.collection_contract(collection) else {
284 return Ok(fields);
285 };
286
287 if contract.declared_model != crate::catalog::CollectionModel::Table
288 || (contract.declared_columns.is_empty()
289 && contract
290 .table_def
291 .as_ref()
292 .map(|table| table.columns.is_empty())
293 .unwrap_or(true))
294 {
295 return Ok(fields);
296 }
297
298 let existing_created_at = if contract.timestamps_enabled && mode == NormalizeMode::Update {
308 fields
309 .iter()
310 .find(|(n, _)| n == "created_at")
311 .map(|(_, v)| v.clone())
312 } else {
313 None
314 };
315
316 if contract.timestamps_enabled && mode == NormalizeMode::Insert {
321 for (name, _) in &fields {
322 if name == "created_at" || name == "updated_at" {
323 return Err(crate::RedDBError::Query(format!(
324 "collection '{}' manages '{}' automatically — do not set it in INSERT",
325 collection, name
326 )));
327 }
328 }
329 }
330
331 let mut provided = std::collections::BTreeMap::new();
332 for (name, value) in &fields {
333 provided.insert(name.clone(), value.clone());
334 }
335
336 let resolved_columns = resolved_contract_columns(&contract)?;
337 let declared_names: std::collections::BTreeSet<String> = resolved_columns
338 .iter()
339 .map(|column| column.name.clone())
340 .collect();
341 let unknown_fields: Vec<String> = fields
342 .iter()
343 .filter(|(name, _)| !declared_names.contains(name))
344 .map(|(name, _)| name.clone())
345 .collect();
346 if matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict)
347 && !unknown_fields.is_empty()
348 {
349 return Err(crate::RedDBError::Query(format!(
350 "collection '{}' is strict and does not allow undeclared fields: {}",
351 collection,
352 unknown_fields.join(", ")
353 )));
354 }
355 let mut normalized = Vec::new();
356 let now_ms = current_unix_ms_u64();
357
358 for column in &resolved_columns {
359 match provided.remove(&column.name) {
360 Some(value) => {
361 if contract.timestamps_enabled && mode == NormalizeMode::Update {
366 match column.name.as_str() {
367 "created_at" => {
368 normalized.push((
369 column.name.clone(),
370 existing_created_at
371 .clone()
372 .unwrap_or(Value::UnsignedInteger(now_ms)),
373 ));
374 continue;
375 }
376 "updated_at" => {
377 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
378 continue;
379 }
380 _ => {}
381 }
382 }
383 normalized.push((
384 column.name.clone(),
385 normalize_contract_value(collection, column, value)?,
386 ));
387 }
388 None => {
389 if contract.timestamps_enabled
393 && (column.name == "created_at" || column.name == "updated_at")
394 {
395 normalized.push((column.name.clone(), Value::UnsignedInteger(now_ms)));
396 continue;
397 }
398 if let Some(default) = &column.default {
399 normalized.push((
400 column.name.clone(),
401 coerce_contract_literal(collection, &column.name, column, default)?,
402 ));
403 } else if column.not_null {
404 return Err(crate::RedDBError::Query(format!(
405 "missing required column '{}' for collection '{}'",
406 column.name, collection
407 )));
408 }
409 }
410 }
411 }
412
413 for (name, value) in fields {
414 if !declared_names.contains(&name) {
415 normalized.push((name, value));
416 }
417 }
418
419 Ok(normalized)
420}
421
422fn current_unix_ms_u64() -> u64 {
423 std::time::SystemTime::now()
424 .duration_since(std::time::UNIX_EPOCH)
425 .map(|d| d.as_millis() as u64)
426 .unwrap_or(0)
427}
428
429fn enforce_row_uniqueness(
430 db: &crate::storage::unified::devx::RedDB,
431 collection: &str,
432 fields: &[(String, Value)],
433 exclude_id: Option<crate::storage::EntityId>,
434) -> RedDBResult<()> {
435 let Some(contract) = db.collection_contract(collection) else {
436 return Ok(());
437 };
438 if contract.declared_model != crate::catalog::CollectionModel::Table
439 && contract.declared_model != crate::catalog::CollectionModel::Mixed
440 {
441 return Ok(());
442 }
443
444 let rules = resolved_uniqueness_rules(&contract);
445 if rules.is_empty() {
446 return Ok(());
447 }
448
449 let store = db.store();
450 let Some(manager) = store.get_collection(collection) else {
451 return Ok(());
452 };
453
454 let input_fields: std::collections::BTreeMap<String, Value> = fields.iter().cloned().collect();
455
456 for rule in &rules {
457 let mut expected_signatures = Vec::new();
458 let mut skip_rule = false;
459
460 for column in &rule.columns {
461 match input_fields.get(column) {
462 Some(Value::Null) | None if rule.primary_key => {
463 return Err(crate::RedDBError::Query(format!(
464 "primary key '{}' in collection '{}' requires non-null column '{}'",
465 rule.name, collection, column
466 )))
467 }
468 Some(Value::Null) | None => {
469 skip_rule = true;
470 break;
471 }
472 Some(value) => {
473 expected_signatures.push((column.clone(), value_signature(value)));
474 }
475 }
476 }
477
478 if skip_rule {
479 continue;
480 }
481
482 for entity in manager.query_all(|_| true) {
483 if exclude_id.map(|id| id == entity.id).unwrap_or(false) {
484 continue;
485 }
486 let Some(existing_fields) = row_fields_from_entity(&entity) else {
487 continue;
488 };
489
490 let duplicate = expected_signatures.iter().all(|(column, expected)| {
491 existing_fields
492 .get(column)
493 .map(|value| value_signature(value) == *expected)
494 .unwrap_or(false)
495 });
496
497 if duplicate {
498 let qualifier = if rule.primary_key {
499 "primary key"
500 } else {
501 "unique constraint"
502 };
503 return Err(crate::RedDBError::Query(format!(
504 "{} '{}' violated on collection '{}' for columns [{}]",
505 qualifier,
506 rule.name,
507 collection,
508 rule.columns.join(", ")
509 )));
510 }
511 }
512 }
513
514 Ok(())
515}
516
517fn enforce_row_batch_uniqueness(
518 db: &crate::storage::unified::devx::RedDB,
519 collection: &str,
520 rows: &[Vec<(String, Value)>],
521) -> RedDBResult<()> {
522 let Some(contract) = db.collection_contract(collection) else {
523 return Ok(());
524 };
525 if contract.declared_model != crate::catalog::CollectionModel::Table
526 && contract.declared_model != crate::catalog::CollectionModel::Mixed
527 {
528 return Ok(());
529 }
530
531 let rules = resolved_uniqueness_rules(&contract);
532 if rules.is_empty() {
533 return Ok(());
534 }
535
536 for rule in &rules {
537 let mut seen = std::collections::HashMap::<String, usize>::new();
538 for (row_index, fields) in rows.iter().enumerate() {
539 let input_fields: std::collections::BTreeMap<String, Value> =
540 fields.iter().cloned().collect();
541 let mut signatures = Vec::new();
542 let mut skip_rule = false;
543
544 for column in &rule.columns {
545 match input_fields.get(column) {
546 Some(Value::Null) | None if rule.primary_key => {
547 return Err(crate::RedDBError::Query(format!(
548 "primary key '{}' in collection '{}' requires non-null column '{}'",
549 rule.name, collection, column
550 )))
551 }
552 Some(Value::Null) | None => {
553 skip_rule = true;
554 break;
555 }
556 Some(value) => signatures.push(format!("{column}={}", value_signature(value))),
557 }
558 }
559
560 if skip_rule {
561 continue;
562 }
563
564 let signature = signatures.join("|");
565 if let Some(previous_index) = seen.insert(signature, row_index) {
566 return Err(crate::RedDBError::Query(format!(
567 "batch insert violates uniqueness rule '{}' in collection '{}' between rows {} and {}",
568 rule.name,
569 collection,
570 previous_index + 1,
571 row_index + 1
572 )));
573 }
574 }
575 }
576
577 Ok(())
578}
579
580fn row_update_requires_uniqueness_check(
581 db: &crate::storage::unified::devx::RedDB,
582 collection: &str,
583 modified_columns: &[String],
584) -> bool {
585 if modified_columns.is_empty() {
586 return false;
587 }
588
589 let Some(contract) = db.collection_contract(collection) else {
590 return false;
591 };
592 if contract.declared_model != crate::catalog::CollectionModel::Table
593 && contract.declared_model != crate::catalog::CollectionModel::Mixed
594 {
595 return false;
596 }
597
598 let rules = resolved_uniqueness_rules(&contract);
599 if rules.is_empty() {
600 return false;
601 }
602
603 rules.iter().any(|rule| {
604 rule.columns.iter().any(|column| {
605 modified_columns
606 .iter()
607 .any(|modified| modified.eq_ignore_ascii_case(column))
608 })
609 })
610}
611
612pub(crate) fn build_row_update_contract_plan(
613 db: &crate::storage::unified::devx::RedDB,
614 collection: &str,
615) -> RedDBResult<Option<RowUpdateContractPlan>> {
616 let Some(contract) = db.collection_contract(collection) else {
617 return Ok(None);
618 };
619
620 let declared_rules = if contract.declared_model == crate::catalog::CollectionModel::Table
621 && !(contract.declared_columns.is_empty()
622 && contract
623 .table_def
624 .as_ref()
625 .map(|table| table.columns.is_empty())
626 .unwrap_or(true))
627 {
628 resolved_contract_columns(&contract)?
629 .into_iter()
630 .map(|rule| {
631 (
632 rule.name.clone(),
633 RowUpdateColumnRule {
634 name: rule.name,
635 data_type: rule.data_type,
636 data_type_name: rule.data_type_name,
637 not_null: rule.not_null,
638 enum_variants: rule.enum_variants,
639 },
640 )
641 })
642 .collect()
643 } else {
644 HashMap::new()
645 };
646
647 let unique_columns = if matches!(
648 contract.declared_model,
649 crate::catalog::CollectionModel::Table | crate::catalog::CollectionModel::Mixed
650 ) {
651 resolved_uniqueness_rules(&contract)
652 .into_iter()
653 .flat_map(|rule| rule.columns.into_iter())
654 .map(|column| (column, ()))
655 .collect()
656 } else {
657 HashMap::new()
658 };
659
660 Ok(Some(RowUpdateContractPlan {
661 timestamps_enabled: contract.timestamps_enabled,
662 strict_schema: matches!(contract.schema_mode, crate::catalog::SchemaMode::Strict),
663 declared_rules,
664 unique_columns,
665 }))
666}
667
668pub(crate) fn normalize_row_update_assignment_with_plan(
669 collection: &str,
670 column: &str,
671 value: Value,
672 row_contract_plan: Option<&RowUpdateContractPlan>,
673) -> RedDBResult<Value> {
674 let Some(plan) = row_contract_plan else {
675 return Ok(value);
676 };
677
678 if plan.timestamps_enabled && (column == "created_at" || column == "updated_at") {
679 return Err(crate::RedDBError::Query(format!(
680 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
681 collection, column
682 )));
683 }
684
685 if let Some(rule) = plan.declared_rules.get(column) {
686 let rule = ResolvedColumnRule {
687 name: rule.name.clone(),
688 data_type: rule.data_type,
689 data_type_name: rule.data_type_name.clone(),
690 not_null: rule.not_null,
691 default: None,
692 enum_variants: rule.enum_variants.clone(),
693 };
694 normalize_contract_value(collection, &rule, value)
695 } else if plan.strict_schema {
696 Err(crate::RedDBError::Query(format!(
697 "collection '{}' is strict and does not allow undeclared fields: {}",
698 collection, column
699 )))
700 } else {
701 Ok(value)
702 }
703}
704
705pub(crate) fn normalize_row_update_value_for_rule(
706 collection: &str,
707 value: Value,
708 row_rule: Option<&RowUpdateColumnRule>,
709) -> RedDBResult<Value> {
710 let Some(rule) = row_rule else {
711 return Ok(value);
712 };
713
714 let rule = ResolvedColumnRule {
715 name: rule.name.clone(),
716 data_type: rule.data_type,
717 data_type_name: rule.data_type_name.clone(),
718 not_null: rule.not_null,
719 default: None,
720 enum_variants: rule.enum_variants.clone(),
721 };
722 normalize_contract_value(collection, &rule, value)
723}
724
725fn set_row_field(row: &mut crate::storage::unified::entity::RowData, name: &str, value: Value) {
726 if let Some(named) = row.named.as_mut() {
727 named.insert(name.to_string(), value);
728 return;
729 }
730
731 if let Some(schema) = row.schema.as_ref() {
732 if let Some(index) = schema.iter().position(|column| column == name) {
733 if let Some(slot) = row.columns.get_mut(index) {
734 *slot = value;
735 return;
736 }
737 }
738
739 let mut named = HashMap::with_capacity(schema.len().saturating_add(1));
740 for (column, current) in schema.iter().zip(row.columns.iter()) {
741 named.insert(column.clone(), current.clone());
742 }
743 named.insert(name.to_string(), value);
744 row.named = Some(named);
745 return;
746 }
747
748 let mut named = HashMap::with_capacity(1);
749 named.insert(name.to_string(), value);
750 row.named = Some(named);
751}
752
753fn collect_row_fields(row: &crate::storage::unified::entity::RowData) -> Vec<(String, Value)> {
754 if let Some(named) = row.named.as_ref() {
755 named
756 .iter()
757 .map(|(key, value)| (key.clone(), value.clone()))
758 .collect()
759 } else if let Some(schema) = row.schema.as_ref() {
760 schema
761 .iter()
762 .cloned()
763 .zip(row.columns.iter().cloned())
764 .collect()
765 } else {
766 Vec::new()
767 }
768}
769
770fn apply_row_field_assignments_raw<I>(
771 row: &mut crate::storage::unified::entity::RowData,
772 field_assignments: I,
773) where
774 I: IntoIterator<Item = (String, Value)>,
775{
776 for (column, value) in field_assignments {
777 set_row_field(row, &column, value);
778 }
779}
780
781fn apply_row_field_assignments_incremental<I>(
782 collection: &str,
783 row: &mut crate::storage::unified::entity::RowData,
784 field_assignments: I,
785 row_contract_plan: Option<&RowUpdateContractPlan>,
786) -> RedDBResult<()>
787where
788 I: IntoIterator<Item = (String, Value)>,
789{
790 for (column, value) in field_assignments {
791 let value = normalize_row_update_assignment_with_plan(
792 collection,
793 &column,
794 value,
795 row_contract_plan,
796 )?;
797
798 set_row_field(row, &column, value);
799 }
800
801 Ok(())
802}
803
804fn resolved_uniqueness_rules(
805 contract: &crate::physical::CollectionContract,
806) -> Vec<UniquenessRule> {
807 let mut rules = Vec::new();
808
809 if let Some(table_def) = &contract.table_def {
810 if !table_def.primary_key.is_empty() {
811 rules.push(UniquenessRule {
812 name: "primary_key".to_string(),
813 columns: table_def.primary_key.clone(),
814 primary_key: true,
815 });
816 }
817
818 for constraint in &table_def.constraints {
819 if matches!(
820 constraint.constraint_type,
821 crate::storage::schema::ConstraintType::PrimaryKey
822 ) && !constraint.columns.is_empty()
823 {
824 rules.push(UniquenessRule {
825 name: constraint.name.clone(),
826 columns: constraint.columns.clone(),
827 primary_key: true,
828 });
829 } else if matches!(
830 constraint.constraint_type,
831 crate::storage::schema::ConstraintType::Unique
832 ) && !constraint.columns.is_empty()
833 {
834 rules.push(UniquenessRule {
835 name: constraint.name.clone(),
836 columns: constraint.columns.clone(),
837 primary_key: false,
838 });
839 }
840 }
841 } else {
842 for column in &contract.declared_columns {
843 if column.primary_key {
844 rules.push(UniquenessRule {
845 name: format!("pk_{}", column.name),
846 columns: vec![column.name.clone()],
847 primary_key: true,
848 });
849 } else if column.unique {
850 rules.push(UniquenessRule {
851 name: format!("uniq_{}", column.name),
852 columns: vec![column.name.clone()],
853 primary_key: false,
854 });
855 }
856 }
857 }
858
859 let mut dedup = std::collections::BTreeSet::new();
860 rules
861 .into_iter()
862 .filter(|rule| dedup.insert((rule.primary_key, rule.columns.clone())))
863 .collect()
864}
865
866fn row_fields_from_entity(
867 entity: &crate::storage::UnifiedEntity,
868) -> Option<std::collections::BTreeMap<String, Value>> {
869 match &entity.data {
870 crate::storage::EntityData::Row(row) => {
871 if let Some(named) = &row.named {
872 Some(
873 named
874 .iter()
875 .map(|(key, value)| (key.clone(), value.clone()))
876 .collect(),
877 )
878 } else {
879 row.schema.as_ref().map(|schema| {
880 schema
881 .iter()
882 .cloned()
883 .zip(row.columns.iter().cloned())
884 .collect()
885 })
886 }
887 }
888 _ => None,
889 }
890}
891
892fn value_signature(value: &Value) -> String {
893 format!("{value:?}")
894}
895
896fn normalize_contract_value(
897 collection: &str,
898 column: &ResolvedColumnRule,
899 value: Value,
900) -> RedDBResult<Value> {
901 if matches!(value, Value::Null) {
902 if column.not_null {
903 return Err(crate::RedDBError::Query(format!(
904 "column '{}' in collection '{}' cannot be null",
905 column.name, collection
906 )));
907 }
908 return Ok(Value::Null);
909 }
910
911 let target = column.data_type;
912 if value_matches_declared_type(&value, target) {
913 return Ok(value);
914 }
915
916 let Some(raw) = value_to_coercion_input(&value) else {
917 return Err(crate::RedDBError::Query(format!(
918 "column '{}' in collection '{}' requires type '{}' but value cannot be coerced",
919 column.name, collection, column.data_type_name
920 )));
921 };
922
923 coerce_contract_literal(collection, &column.name, column, &raw)
924}
925
926fn coerce_contract_literal(
927 collection: &str,
928 column_name: &str,
929 column: &ResolvedColumnRule,
930 raw: &str,
931) -> RedDBResult<Value> {
932 let target = column.data_type;
933 match target {
934 DataType::Blob => Ok(Value::Blob(raw.as_bytes().to_vec())),
935 DataType::Json => Ok(Value::Json(raw.as_bytes().to_vec())),
936 DataType::Timestamp => raw.parse::<i64>().map(Value::Timestamp).map_err(|err| {
937 crate::RedDBError::Query(format!(
938 "failed to coerce column '{}' in collection '{}' to '{}': {}",
939 column_name, collection, column.data_type_name, err
940 ))
941 }),
942 DataType::Duration => raw.parse::<i64>().map(Value::Duration).map_err(|err| {
943 crate::RedDBError::Query(format!(
944 "failed to coerce column '{}' in collection '{}' to '{}': {}",
945 column_name, collection, column.data_type_name, err
946 ))
947 }),
948 DataType::Vector | DataType::Array => Err(crate::RedDBError::Query(format!(
949 "column '{}' in collection '{}' requires '{}' and only typed values are accepted for this type",
950 column_name, collection, column.data_type_name
951 ))),
952 _ => coerce_schema_value(raw, target, Some(column.enum_variants.as_slice())).map_err(
953 |err| {
954 crate::RedDBError::Query(format!(
955 "failed to coerce column '{}' in collection '{}' to '{}': {}",
956 column_name, collection, column.data_type_name, err
957 ))
958 },
959 ),
960 }
961}
962
963struct ResolvedColumnRule {
964 name: String,
965 data_type: DataType,
966 data_type_name: String,
967 not_null: bool,
968 default: Option<String>,
969 enum_variants: Vec<String>,
970}
971
972fn resolved_contract_columns(
973 contract: &crate::physical::CollectionContract,
974) -> RedDBResult<Vec<ResolvedColumnRule>> {
975 if let Some(table_def) = &contract.table_def {
976 return Ok(table_def
977 .columns
978 .iter()
979 .map(|column| ResolvedColumnRule {
980 name: column.name.clone(),
981 data_type: column.data_type,
982 data_type_name: data_type_name(column.data_type).to_string(),
983 not_null: !column.nullable,
984 default: column
985 .default
986 .as_ref()
987 .map(|bytes| String::from_utf8_lossy(bytes).to_string()),
988 enum_variants: column.enum_variants.clone(),
989 })
990 .collect());
991 }
992
993 contract
994 .declared_columns
995 .iter()
996 .map(|column| {
997 let data_type = column
998 .sql_type
999 .as_ref()
1000 .map(crate::storage::query::resolve_sql_type_name)
1001 .transpose()
1002 .map_err(|err| crate::RedDBError::Query(err.to_string()))?
1003 .unwrap_or(parse_declared_data_type(&column.data_type)?);
1004 Ok(ResolvedColumnRule {
1005 name: column.name.clone(),
1006 data_type,
1007 data_type_name: column.data_type.clone(),
1008 not_null: column.not_null,
1009 default: column.default.clone(),
1010 enum_variants: column.enum_variants.clone(),
1011 })
1012 })
1013 .collect()
1014}
1015
1016fn parse_declared_data_type(value: &str) -> RedDBResult<DataType> {
1017 resolve_declared_data_type(value).map_err(|err| crate::RedDBError::Query(err.to_string()))
1018}
1019
1020fn data_type_name(data_type: DataType) -> &'static str {
1021 match data_type {
1022 DataType::Integer => "integer",
1023 DataType::UnsignedInteger => "unsigned_integer",
1024 DataType::Float => "float",
1025 DataType::Text => "text",
1026 DataType::Blob => "blob",
1027 DataType::Boolean => "boolean",
1028 DataType::Timestamp => "timestamp",
1029 DataType::Duration => "duration",
1030 DataType::IpAddr => "ipaddr",
1031 DataType::MacAddr => "macaddr",
1032 DataType::Vector => "vector",
1033 DataType::Nullable => "nullable",
1034 DataType::Unknown => "unknown",
1035 DataType::Json => "json",
1036 DataType::Uuid => "uuid",
1037 DataType::NodeRef => "noderef",
1038 DataType::EdgeRef => "edgeref",
1039 DataType::VectorRef => "vectorref",
1040 DataType::RowRef => "rowref",
1041 DataType::Color => "color",
1042 DataType::Email => "email",
1043 DataType::Url => "url",
1044 DataType::Phone => "phone",
1045 DataType::Semver => "semver",
1046 DataType::Cidr => "cidr",
1047 DataType::Date => "date",
1048 DataType::Time => "time",
1049 DataType::Decimal => "decimal",
1050 DataType::Enum => "enum",
1051 DataType::Array => "array",
1052 DataType::TimestampMs => "timestamp_ms",
1053 DataType::Ipv4 => "ipv4",
1054 DataType::Ipv6 => "ipv6",
1055 DataType::Subnet => "subnet",
1056 DataType::Port => "port",
1057 DataType::Latitude => "latitude",
1058 DataType::Longitude => "longitude",
1059 DataType::GeoPoint => "geopoint",
1060 DataType::Country2 => "country2",
1061 DataType::Country3 => "country3",
1062 DataType::Lang2 => "lang2",
1063 DataType::Lang5 => "lang5",
1064 DataType::Currency => "currency",
1065 DataType::AssetCode => "asset_code",
1066 DataType::Money => "money",
1067 DataType::ColorAlpha => "color_alpha",
1068 DataType::BigInt => "bigint",
1069 DataType::KeyRef => "keyref",
1070 DataType::DocRef => "docref",
1071 DataType::TableRef => "tableref",
1072 DataType::PageRef => "pageref",
1073 DataType::Secret => "secret",
1074 DataType::Password => "password",
1075 DataType::TextZstd => "text",
1076 DataType::BlobZstd => "blob",
1077 }
1078}
1079
1080fn value_matches_declared_type(value: &Value, target: DataType) -> bool {
1081 matches!(
1082 (value, target),
1083 (Value::Null, _)
1084 | (Value::Integer(_), DataType::Integer)
1085 | (Value::UnsignedInteger(_), DataType::UnsignedInteger)
1086 | (Value::Float(_), DataType::Float)
1087 | (Value::Text(_), DataType::Text)
1088 | (Value::Blob(_), DataType::Blob)
1089 | (Value::Boolean(_), DataType::Boolean)
1090 | (Value::Timestamp(_), DataType::Timestamp)
1091 | (Value::Duration(_), DataType::Duration)
1092 | (Value::IpAddr(_), DataType::IpAddr)
1093 | (Value::MacAddr(_), DataType::MacAddr)
1094 | (Value::Vector(_), DataType::Vector)
1095 | (Value::Json(_), DataType::Json)
1096 | (Value::Uuid(_), DataType::Uuid)
1097 | (Value::NodeRef(_), DataType::NodeRef)
1098 | (Value::EdgeRef(_), DataType::EdgeRef)
1099 | (Value::VectorRef(_, _), DataType::VectorRef)
1100 | (Value::RowRef(_, _), DataType::RowRef)
1101 | (Value::Color(_), DataType::Color)
1102 | (Value::Email(_), DataType::Email)
1103 | (Value::Url(_), DataType::Url)
1104 | (Value::Phone(_), DataType::Phone)
1105 | (Value::Semver(_), DataType::Semver)
1106 | (Value::Cidr(_, _), DataType::Cidr)
1107 | (Value::Date(_), DataType::Date)
1108 | (Value::Time(_), DataType::Time)
1109 | (Value::Decimal(_), DataType::Decimal)
1110 | (Value::EnumValue(_), DataType::Enum)
1111 | (Value::Array(_), DataType::Array)
1112 | (Value::TimestampMs(_), DataType::TimestampMs)
1113 | (Value::Ipv4(_), DataType::Ipv4)
1114 | (Value::Ipv6(_), DataType::Ipv6)
1115 | (Value::Subnet(_, _), DataType::Subnet)
1116 | (Value::Port(_), DataType::Port)
1117 | (Value::Latitude(_), DataType::Latitude)
1118 | (Value::Longitude(_), DataType::Longitude)
1119 | (Value::GeoPoint(_, _), DataType::GeoPoint)
1120 | (Value::Country2(_), DataType::Country2)
1121 | (Value::Country3(_), DataType::Country3)
1122 | (Value::Lang2(_), DataType::Lang2)
1123 | (Value::Lang5(_), DataType::Lang5)
1124 | (Value::Currency(_), DataType::Currency)
1125 | (Value::ColorAlpha(_), DataType::ColorAlpha)
1126 | (Value::BigInt(_), DataType::BigInt)
1127 | (Value::KeyRef(_, _), DataType::KeyRef)
1128 | (Value::DocRef(_, _), DataType::DocRef)
1129 | (Value::TableRef(_), DataType::TableRef)
1130 | (Value::PageRef(_), DataType::PageRef)
1131 | (Value::Secret(_), DataType::Secret)
1132 | (Value::Password(_), DataType::Password)
1133 )
1134}
1135
1136fn value_to_coercion_input(value: &Value) -> Option<String> {
1137 match value {
1138 Value::Null => None,
1139 Value::Integer(value) => Some(value.to_string()),
1140 Value::UnsignedInteger(value) => Some(value.to_string()),
1141 Value::Float(value) => Some(value.to_string()),
1142 Value::Text(value) => Some(value.to_string()),
1143 Value::Blob(value) => String::from_utf8(value.clone()).ok(),
1144 Value::Boolean(value) => Some(value.to_string()),
1145 Value::Timestamp(value) => Some(value.to_string()),
1146 Value::Duration(value) => Some(value.to_string()),
1147 Value::IpAddr(value) => Some(value.to_string()),
1148 Value::MacAddr(value) => Some(format!(
1149 "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
1150 value[0], value[1], value[2], value[3], value[4], value[5]
1151 )),
1152 Value::Json(value) => Some(String::from_utf8_lossy(value).to_string()),
1153 Value::Email(value) => Some(value.clone()),
1154 Value::Url(value) => Some(value.clone()),
1155 Value::Phone(value) => Some(value.to_string()),
1156 Value::Semver(value) => Some(format!(
1157 "{}.{}.{}",
1158 value / 1_000_000,
1159 (value / 1_000) % 1_000,
1160 value % 1_000
1161 )),
1162 Value::Date(value) => Some(value.to_string()),
1163 Value::Time(value) => Some(value.to_string()),
1164 Value::Decimal(value) => Some(value.to_string()),
1165 Value::TimestampMs(value) => Some(value.to_string()),
1166 Value::Ipv4(value) => Some(format!(
1167 "{}.{}.{}.{}",
1168 (value >> 24) & 0xFF,
1169 (value >> 16) & 0xFF,
1170 (value >> 8) & 0xFF,
1171 value & 0xFF
1172 )),
1173 Value::Port(value) => Some(value.to_string()),
1174 Value::Latitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1175 Value::Longitude(value) => Some((*value as f64 / 1_000_000.0).to_string()),
1176 Value::GeoPoint(lat, lon) => Some(format!(
1177 "{},{}",
1178 *lat as f64 / 1_000_000.0,
1179 *lon as f64 / 1_000_000.0
1180 )),
1181 Value::BigInt(value) => Some(value.to_string()),
1182 Value::TableRef(value) => Some(value.clone()),
1183 Value::PageRef(value) => Some(value.to_string()),
1184 Value::Password(value) => Some(value.clone()),
1185 _ => None,
1186 }
1187}
1188
1189fn dedupe_modified_columns(mut modified_columns: Vec<String>) -> Vec<String> {
1190 if modified_columns.is_empty() {
1191 return modified_columns;
1192 }
1193
1194 let mut unique = Vec::with_capacity(modified_columns.len());
1195 for column in modified_columns.drain(..) {
1196 if !unique
1197 .iter()
1198 .any(|existing: &String| existing.eq_ignore_ascii_case(&column))
1199 {
1200 unique.push(column);
1201 }
1202 }
1203 unique
1204}
1205
1206fn reject_document_array_position_path(path: &[String]) -> RedDBResult<()> {
1207 if path.iter().any(|segment| segment.parse::<usize>().is_ok()) {
1208 return Err(crate::RedDBError::Query(
1209 "array positional document patch paths are unsupported; replace the array or full document body instead"
1210 .to_string(),
1211 ));
1212 }
1213 Ok(())
1214}
1215
1216fn document_body_from_named(fields: &HashMap<String, Value>) -> RedDBResult<JsonValue> {
1217 match fields.get("body") {
1218 Some(Value::Json(bytes)) => crate::json::from_slice(bytes).map_err(|err| {
1219 crate::RedDBError::Query(format!("failed to decode document body: {err}"))
1220 }),
1221 Some(_) => Err(crate::RedDBError::Query(
1222 "document body field must contain JSON".to_string(),
1223 )),
1224 None => Ok(JsonValue::Object(Default::default())),
1225 }
1226}
1227
1228fn replace_document_row_body(
1229 fields: &mut HashMap<String, Value>,
1230 body: JsonValue,
1231 modified_columns: &mut Vec<String>,
1232) -> RedDBResult<()> {
1233 modified_columns.push("body".to_string());
1234
1235 let old_keys: Vec<String> = fields
1236 .keys()
1237 .filter(|key| key.as_str() != "body")
1238 .cloned()
1239 .collect();
1240 for key in old_keys {
1241 fields.remove(&key);
1242 modified_columns.push(key);
1243 }
1244
1245 let body_bytes = json_to_vec(&body).map_err(|err| {
1246 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
1247 })?;
1248 fields.insert("body".to_string(), Value::Json(body_bytes));
1249
1250 if let JsonValue::Object(map) = &body {
1251 for (key, value) in map {
1252 fields.insert(key.clone(), json_to_storage_value(value)?);
1253 modified_columns.push(key.clone());
1254 }
1255 }
1256
1257 Ok(())
1258}
1259
1260impl RedDBRuntime {
1261 pub(crate) fn apply_loaded_patch_entity_core(
1262 &self,
1263 collection: String,
1264 mut entity: crate::storage::UnifiedEntity,
1265 payload: JsonValue,
1266 operations: Vec<PatchEntityOperation>,
1267 ) -> RedDBResult<AppliedEntityMutation> {
1268 let id = entity.id;
1269 let operations = normalize_ttl_patch_operations(operations)?;
1270 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1273
1274 let db = self.db();
1275 let store = db.store();
1276 let Some(manager) = store.get_collection(&collection) else {
1277 return Err(crate::RedDBError::NotFound(format!(
1278 "collection not found: {collection}"
1279 )));
1280 };
1281
1282 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1283 let mut metadata_changed = false;
1284 let mut modified_columns: Vec<String> = Vec::new();
1285 let mut context_index_dirty = false;
1286 let mut graph_node_type: Option<String> = None;
1287 let mut graph_edge_weight: Option<f32> = None;
1288
1289 let row_contract_timestamps = db
1290 .collection_contract(&collection)
1291 .map(|c| c.timestamps_enabled)
1292 .unwrap_or(false);
1293
1294 match &mut entity.data {
1295 crate::storage::EntityData::Row(row) => {
1296 let is_document_collection = db
1297 .collection_contract(&collection)
1298 .map(|contract| {
1299 contract.declared_model == crate::catalog::CollectionModel::Document
1300 })
1301 .unwrap_or(false);
1302 let mut field_ops = Vec::new();
1303 let mut metadata_ops = Vec::new();
1304 let mut document_body_ops = Vec::new();
1305
1306 for mut op in operations {
1307 let Some(root) = op.path.first().map(String::as_str) else {
1308 return Err(crate::RedDBError::Query(
1309 "patch path cannot be empty".to_string(),
1310 ));
1311 };
1312
1313 match root {
1314 "body" if is_document_collection => {
1315 if op.path.len() < 2 {
1316 return Err(crate::RedDBError::Query(
1317 "document body patch paths require a nested key; use payload.body for full replacement"
1318 .to_string(),
1319 ));
1320 }
1321 op.path.remove(0);
1322 reject_document_array_position_path(&op.path)?;
1323 document_body_ops.push(op);
1324 }
1325 "fields" | "named" => {
1326 if op.path.len() < 2 {
1327 return Err(crate::RedDBError::Query(
1328 "patch path 'fields' requires a nested key".to_string(),
1329 ));
1330 }
1331 if row_contract_timestamps {
1332 let leaf = op.path.get(1).map(String::as_str);
1333 if matches!(leaf, Some("created_at") | Some("updated_at")) {
1334 return Err(crate::RedDBError::Query(format!(
1335 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1336 collection,
1337 leaf.unwrap_or("")
1338 )));
1339 }
1340 }
1341 op.path.remove(0);
1342 field_ops.push(op);
1343 }
1344 "metadata" => {
1345 if op.path.len() < 2 {
1346 return Err(crate::RedDBError::Query(
1347 "patch path 'metadata' requires a nested key".to_string(),
1348 ));
1349 }
1350 op.path.remove(0);
1351 metadata_ops.push(op);
1352 }
1353 _ => {
1354 return Err(crate::RedDBError::Query(format!(
1355 "unsupported patch target '{root}' for table rows. Use fields/*, metadata/*, or weight"
1356 )));
1357 }
1358 }
1359 }
1360
1361 if !document_body_ops.is_empty() {
1362 context_index_dirty = true;
1363 let named = row.named.get_or_insert_with(Default::default);
1364 let mut body = document_body_from_named(named)?;
1365 apply_patch_operations_to_json(&mut body, &document_body_ops)
1366 .map_err(crate::RedDBError::Query)?;
1367 replace_document_row_body(named, body, &mut modified_columns)?;
1368 }
1369
1370 if is_document_collection {
1371 if let Some(body) = payload.get("body") {
1372 context_index_dirty = true;
1373 let named = row.named.get_or_insert_with(Default::default);
1374 replace_document_row_body(named, body.clone(), &mut modified_columns)?;
1375 }
1376 }
1377
1378 if !field_ops.is_empty() {
1379 context_index_dirty = true;
1380 for op in &field_ops {
1381 if let Some(col) = op.path.first() {
1382 modified_columns.push(col.clone());
1383 }
1384 }
1385 let named = row.named.get_or_insert_with(Default::default);
1386 apply_patch_operations_to_storage_map(named, &field_ops)?;
1387 }
1388
1389 if let Some(fields) = payload
1390 .get("fields")
1391 .and_then(crate::json::Value::as_object)
1392 {
1393 if row_contract_timestamps {
1394 for key in fields.keys() {
1395 if key == "created_at" || key == "updated_at" {
1396 return Err(crate::RedDBError::Query(format!(
1397 "collection '{}' manages '{}' automatically — do not set it in UPDATE",
1398 collection, key
1399 )));
1400 }
1401 }
1402 }
1403 context_index_dirty = true;
1404 let named = row.named.get_or_insert_with(Default::default);
1405 for (key, value) in fields {
1406 modified_columns.push(key.clone());
1407 named.insert(key.clone(), json_to_storage_value(value)?);
1408 }
1409 }
1410
1411 if !metadata_ops.is_empty() {
1412 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1413 let metadata = patch_metadata.get_or_insert_with(|| {
1414 store.get_metadata(&collection, id).unwrap_or_default()
1415 });
1416 let mut metadata_json = metadata_to_json(metadata);
1417 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1418 .map_err(crate::RedDBError::Query)?;
1419 *metadata = metadata_from_json(&metadata_json)?;
1420 metadata_changed = true;
1421 }
1422
1423 if !modified_columns.is_empty() || row_contract_timestamps {
1424 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1425 let current_fields = if let Some(named) = row.named.take() {
1426 named.into_iter().collect::<Vec<_>>()
1427 } else if let Some(schema) = row.schema.as_ref() {
1428 schema
1429 .iter()
1430 .cloned()
1431 .zip(row.columns.iter().cloned())
1432 .collect::<Vec<_>>()
1433 } else {
1434 Vec::new()
1435 };
1436 let normalized_fields = contract.normalize_update_fields(current_fields)?;
1437 if row_contract_timestamps {
1438 modified_columns.push("updated_at".to_string());
1439 context_index_dirty = true;
1440 }
1441 if contract.requires_uniqueness_check(&modified_columns) {
1442 contract.enforce_row_uniqueness(&normalized_fields, Some(id))?;
1443 }
1444 row.named = Some(normalized_fields.into_iter().collect());
1445 }
1446 }
1447 crate::storage::EntityData::Node(node) => {
1448 let mut field_ops = Vec::new();
1449 let mut metadata_ops = Vec::new();
1450 let mut node_type_ops = Vec::new();
1451
1452 for mut op in operations {
1453 let Some(root) = op.path.first().map(String::as_str) else {
1454 return Err(crate::RedDBError::Query(
1455 "patch path cannot be empty".to_string(),
1456 ));
1457 };
1458
1459 match root {
1460 "fields" | "properties" => {
1461 if op.path.len() < 2 {
1462 return Err(crate::RedDBError::Query(
1463 "patch path 'fields' requires a nested key".to_string(),
1464 ));
1465 }
1466 op.path.remove(0);
1467 field_ops.push(op);
1468 }
1469 "metadata" => {
1470 if op.path.len() < 2 {
1471 return Err(crate::RedDBError::Query(
1472 "patch path 'metadata' requires a nested key".to_string(),
1473 ));
1474 }
1475 op.path.remove(0);
1476 metadata_ops.push(op);
1477 }
1478 "node_type" => {
1479 if op.path.len() != 1 {
1480 return Err(crate::RedDBError::Query(
1481 "patch path 'node_type' does not allow nested keys".to_string(),
1482 ));
1483 }
1484 op.path.clear();
1485 node_type_ops.push(op);
1486 }
1487 _ => {
1488 return Err(crate::RedDBError::Query(format!(
1489 "unsupported patch target '{root}' for graph nodes. Use fields/*, properties/*, node_type, or metadata/*"
1490 )));
1491 }
1492 }
1493 }
1494
1495 for op in node_type_ops {
1496 context_index_dirty = true;
1497 let value = op.value.ok_or_else(|| {
1498 crate::RedDBError::Query("node_type operations require a value".to_string())
1499 })?;
1500
1501 match op.op {
1502 PatchEntityOperationType::Unset => {
1503 return Err(crate::RedDBError::Query(
1504 "node_type cannot be unset through patch operations".to_string(),
1505 ));
1506 }
1507 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1508 let Some(node_type) = value.as_str() else {
1509 return Err(crate::RedDBError::Query(
1510 "node_type operation requires a text value".to_string(),
1511 ));
1512 };
1513 graph_node_type = Some(node_type.to_string());
1514 modified_columns.push("node_type".to_string());
1515 }
1516 }
1517 }
1518
1519 if !field_ops.is_empty() {
1520 context_index_dirty = true;
1521 apply_patch_operations_to_storage_map(&mut node.properties, &field_ops)?;
1522 }
1523
1524 if let Some(fields) = payload
1525 .get("fields")
1526 .and_then(crate::json::Value::as_object)
1527 {
1528 context_index_dirty = true;
1529 for (key, value) in fields {
1530 node.properties
1531 .insert(key.clone(), json_to_storage_value(value)?);
1532 }
1533 }
1534
1535 if !metadata_ops.is_empty() {
1536 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1537 let metadata = patch_metadata.get_or_insert_with(|| {
1538 store.get_metadata(&collection, id).unwrap_or_default()
1539 });
1540 let mut metadata_json = metadata_to_json(metadata);
1541 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1542 .map_err(crate::RedDBError::Query)?;
1543 *metadata = metadata_from_json(&metadata_json)?;
1544 metadata_changed = true;
1545 }
1546 }
1547 crate::storage::EntityData::Edge(edge) => {
1548 let mut field_ops = Vec::new();
1549 let mut metadata_ops = Vec::new();
1550 let mut weight_ops = Vec::new();
1551
1552 for mut op in operations {
1553 let Some(root) = op.path.first().map(String::as_str) else {
1554 return Err(crate::RedDBError::Query(
1555 "patch path cannot be empty".to_string(),
1556 ));
1557 };
1558
1559 match root {
1560 "fields" | "properties" => {
1561 if op.path.len() < 2 {
1562 return Err(crate::RedDBError::Query(
1563 "patch path 'fields' requires a nested key".to_string(),
1564 ));
1565 }
1566 op.path.remove(0);
1567 field_ops.push(op);
1568 }
1569 "weight" => {
1570 if op.path.len() != 1 {
1571 return Err(crate::RedDBError::Query(
1572 "patch path 'weight' does not allow nested keys".to_string(),
1573 ));
1574 }
1575 op.path.clear();
1576 weight_ops.push(op);
1577 }
1578 "metadata" => {
1579 if op.path.len() < 2 {
1580 return Err(crate::RedDBError::Query(
1581 "patch path 'metadata' requires a nested key".to_string(),
1582 ));
1583 }
1584 op.path.remove(0);
1585 metadata_ops.push(op);
1586 }
1587 _ => {
1588 return Err(crate::RedDBError::Query(format!(
1589 "unsupported patch target '{root}' for graph edges. Use fields/*, weight, metadata/*"
1590 )));
1591 }
1592 }
1593 }
1594
1595 if !field_ops.is_empty() {
1596 context_index_dirty = true;
1597 apply_patch_operations_to_storage_map(&mut edge.properties, &field_ops)?;
1598 }
1599
1600 for op in weight_ops {
1601 context_index_dirty = true;
1602 let value = op.value.ok_or_else(|| {
1603 crate::RedDBError::Query("weight operations require a value".to_string())
1604 })?;
1605
1606 match op.op {
1607 PatchEntityOperationType::Unset => {
1608 return Err(crate::RedDBError::Query(
1609 "weight cannot be unset through patch operations".to_string(),
1610 ));
1611 }
1612 PatchEntityOperationType::Set | PatchEntityOperationType::Replace => {
1613 let Some(weight) = value.as_f64() else {
1614 return Err(crate::RedDBError::Query(
1615 "weight operation requires a numeric value".to_string(),
1616 ));
1617 };
1618 graph_edge_weight = Some(weight as f32);
1619 edge.weight = weight as f32;
1620 modified_columns.push("weight".to_string());
1621 }
1622 }
1623 }
1624
1625 if let Some(fields) = payload
1626 .get("fields")
1627 .and_then(crate::json::Value::as_object)
1628 {
1629 context_index_dirty = true;
1630 for (key, value) in fields {
1631 edge.properties
1632 .insert(key.clone(), json_to_storage_value(value)?);
1633 }
1634 }
1635
1636 if !metadata_ops.is_empty() {
1637 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1638 let metadata = patch_metadata.get_or_insert_with(|| {
1639 store.get_metadata(&collection, id).unwrap_or_default()
1640 });
1641 let mut metadata_json = metadata_to_json(metadata);
1642 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1643 .map_err(crate::RedDBError::Query)?;
1644 *metadata = metadata_from_json(&metadata_json)?;
1645 metadata_changed = true;
1646 }
1647 }
1648 crate::storage::EntityData::Vector(vector) => {
1649 let mut field_ops = Vec::new();
1650 let mut metadata_ops = Vec::new();
1651
1652 for mut op in operations {
1653 let Some(root) = op.path.first().map(String::as_str) else {
1654 return Err(crate::RedDBError::Query(
1655 "patch path cannot be empty".to_string(),
1656 ));
1657 };
1658
1659 match root {
1660 "fields" => {
1661 if op.path.len() < 2 {
1662 return Err(crate::RedDBError::Query(
1663 "patch path 'fields' requires a nested key".to_string(),
1664 ));
1665 }
1666 op.path.remove(0);
1667 let Some(target) = op.path.first().map(String::as_str) else {
1668 return Err(crate::RedDBError::Query(
1669 "patch path requires a target under fields".to_string(),
1670 ));
1671 };
1672 if !matches!(target, "dense" | "content" | "sparse") {
1673 return Err(crate::RedDBError::Query(format!(
1674 "unsupported vector patch target '{target}'"
1675 )));
1676 }
1677 field_ops.push(op);
1678 }
1679 "metadata" => {
1680 if op.path.len() < 2 {
1681 return Err(crate::RedDBError::Query(
1682 "patch path 'metadata' requires a nested key".to_string(),
1683 ));
1684 }
1685 op.path.remove(0);
1686 metadata_ops.push(op);
1687 }
1688 _ => {
1689 return Err(crate::RedDBError::Query(format!(
1690 "unsupported patch target '{root}' for vectors. Use fields/* or metadata/*"
1691 )));
1692 }
1693 }
1694 }
1695
1696 if !field_ops.is_empty() {
1697 context_index_dirty = true;
1698 apply_patch_operations_to_vector_fields(vector, &field_ops)?;
1699 }
1700
1701 if let Some(fields) = payload
1702 .get("fields")
1703 .and_then(crate::json::Value::as_object)
1704 {
1705 context_index_dirty = true;
1706 if let Some(content) =
1707 fields.get("content").and_then(crate::json::Value::as_str)
1708 {
1709 vector.content = Some(content.to_string());
1710 }
1711 if let Some(dense) = fields.get("dense") {
1712 vector.dense = dense
1713 .as_array()
1714 .ok_or_else(|| {
1715 crate::RedDBError::Query(
1716 "field 'dense' must be an array".to_string(),
1717 )
1718 })?
1719 .iter()
1720 .map(|value| {
1721 value.as_f64().map(|value| value as f32).ok_or_else(|| {
1722 crate::RedDBError::Query(
1723 "field 'dense' must contain only numbers".to_string(),
1724 )
1725 })
1726 })
1727 .collect::<Result<Vec<_>, _>>()?;
1728 }
1729 }
1730
1731 if !metadata_ops.is_empty() {
1732 ensure_non_tree_reserved_metadata_patch_paths(&metadata_ops)?;
1733 let metadata = patch_metadata.get_or_insert_with(|| {
1734 store.get_metadata(&collection, id).unwrap_or_default()
1735 });
1736 let mut metadata_json = metadata_to_json(metadata);
1737 apply_patch_operations_to_json(&mut metadata_json, &metadata_ops)
1738 .map_err(crate::RedDBError::Query)?;
1739 *metadata = metadata_from_json(&metadata_json)?;
1740 metadata_changed = true;
1741 }
1742 }
1743 crate::storage::EntityData::TimeSeries(_)
1744 | crate::storage::EntityData::QueueMessage(_) => {
1745 return Err(crate::RedDBError::Query(
1746 "patch operations are not supported for TimeSeries or QueueMessage entities"
1747 .to_string(),
1748 ));
1749 }
1750 }
1751
1752 if let Some(node_type) = graph_node_type {
1753 if let crate::storage::EntityKind::GraphNode(node) = &mut entity.kind {
1754 node.node_type = node_type;
1755 }
1756 }
1757 if let Some(weight) = graph_edge_weight {
1758 if let crate::storage::EntityKind::GraphEdge(edge) = &mut entity.kind {
1759 edge.weight = (weight * 1000.0) as u32;
1760 }
1761 }
1762
1763 if let Some(metadata) = payload
1764 .get("metadata")
1765 .and_then(crate::json::Value::as_object)
1766 {
1767 let patch_metadata = patch_metadata
1768 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1769 for (key, value) in metadata {
1770 ensure_non_tree_reserved_metadata_key(key)?;
1771 patch_metadata.set(key.clone(), json_to_metadata_value(value)?);
1772 }
1773 metadata_changed = true;
1774 }
1775
1776 for (key, value) in parse_top_level_ttl_metadata_entries(&payload)? {
1777 let patch_metadata = patch_metadata
1778 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default());
1779 if matches!(value, crate::storage::unified::MetadataValue::Null) {
1780 patch_metadata.remove(&key);
1781 } else {
1782 patch_metadata.set(key, value);
1783 }
1784 metadata_changed = true;
1785 }
1786
1787 entity.updated_at = std::time::SystemTime::now()
1788 .duration_since(std::time::UNIX_EPOCH)
1789 .unwrap_or_default()
1790 .as_secs();
1791
1792 modified_columns = dedupe_modified_columns(modified_columns);
1793
1794 Ok(AppliedEntityMutation {
1795 id,
1796 collection,
1797 entity,
1798 metadata: patch_metadata,
1799 modified_columns,
1800 persist_metadata: metadata_changed,
1801 context_index_dirty,
1802 replaced_entity: None,
1803 replaced_entity_previous_xmax: 0,
1804 pre_mutation_fields,
1805 })
1806 }
1807
1808 pub(crate) fn apply_loaded_sql_update_row_core(
1809 &self,
1810 collection: String,
1811 mut entity: crate::storage::UnifiedEntity,
1812 static_field_assignments: &[(String, Value)],
1813 dynamic_field_assignments: Vec<(String, Value)>,
1814 static_metadata_assignments: &[(String, MetadataValue)],
1815 dynamic_metadata_assignments: Vec<(String, MetadataValue)>,
1816 row_contract_plan: Option<&RowUpdateContractPlan>,
1817 row_modified_columns_template: &[String],
1818 row_touches_unique_columns: bool,
1819 ) -> RedDBResult<AppliedEntityMutation> {
1820 let id = entity.id;
1821 let previous_xmax = entity.xmax;
1822 let db = self.db();
1823 let store = db.store();
1824 let Some(_) = store.get_collection(&collection) else {
1825 return Err(crate::RedDBError::NotFound(format!(
1826 "collection not found: {collection}"
1827 )));
1828 };
1829
1830 let versioned_update_xid = match self.current_xid() {
1831 Some(xid) => Some(xid),
1832 None => {
1833 let snapshot_manager = self.snapshot_manager();
1834 let xid = snapshot_manager.begin();
1835 snapshot_manager.commit(xid);
1836 Some(xid)
1837 }
1838 };
1839 let mut replaced_entity = versioned_update_xid.map(|xid| {
1840 let mut old = entity.clone();
1841 old.set_xmax(xid);
1842 old
1843 });
1844
1845 let mut patch_metadata: Option<crate::storage::unified::Metadata> = None;
1846 let row_contract_timestamps = row_contract_plan
1847 .map(|plan| plan.timestamps_enabled)
1848 .unwrap_or(false);
1849 let mut metadata_changed = false;
1850 let mut modified_columns = row_modified_columns_template.to_vec();
1851 let mut context_index_dirty = !modified_columns.is_empty();
1852
1853 let pre_mutation_fields = entity_row_fields_snapshot(&entity);
1857
1858 let crate::storage::EntityData::Row(row) = &mut entity.data else {
1859 return Err(crate::RedDBError::Query(
1860 "SQL row update fast path requires a row entity".to_string(),
1861 ));
1862 };
1863
1864 let _ = row_contract_plan;
1865 apply_row_field_assignments_raw(row, static_field_assignments.iter().cloned());
1866 apply_row_field_assignments_raw(row, dynamic_field_assignments);
1867
1868 for (key, value) in static_metadata_assignments
1869 .iter()
1870 .cloned()
1871 .chain(dynamic_metadata_assignments)
1872 {
1873 ensure_non_tree_reserved_metadata_key(&key)?;
1874 patch_metadata
1875 .get_or_insert_with(|| store.get_metadata(&collection, id).unwrap_or_default())
1876 .set(key, value);
1877 metadata_changed = true;
1878 }
1879
1880 if !modified_columns.is_empty() || row_contract_timestamps {
1881 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
1882 if row_contract_timestamps {
1883 context_index_dirty = true;
1884 set_row_field(
1885 row,
1886 "updated_at",
1887 Value::UnsignedInteger(current_unix_ms_u64()),
1888 );
1889 modified_columns.push("updated_at".to_string());
1890 }
1891 if row_touches_unique_columns {
1892 let current_fields = collect_row_fields(row);
1893 contract.enforce_row_uniqueness(¤t_fields, Some(id))?;
1894 }
1895 }
1896
1897 entity.updated_at = std::time::SystemTime::now()
1898 .duration_since(std::time::UNIX_EPOCH)
1899 .unwrap_or_default()
1900 .as_secs();
1901
1902 if let Some(xid) = versioned_update_xid {
1903 let logical_id = entity.logical_id();
1904 entity.id = store.next_entity_id();
1905 entity.set_logical_id(logical_id);
1906 entity.set_xmin(xid);
1907 entity.set_xmax(0);
1908 if let Some(old) = replaced_entity.as_mut() {
1909 old.set_xmax(xid);
1910 }
1911 }
1912
1913 modified_columns = dedupe_modified_columns(modified_columns);
1914
1915 Ok(AppliedEntityMutation {
1916 id: entity.id,
1917 collection,
1918 entity,
1919 metadata: patch_metadata,
1920 modified_columns,
1921 persist_metadata: metadata_changed,
1922 context_index_dirty,
1923 replaced_entity,
1924 replaced_entity_previous_xmax: previous_xmax,
1925 pre_mutation_fields,
1926 })
1927 }
1928
1929 pub(crate) fn persist_applied_entity_mutations(
1930 &self,
1931 applied: &[AppliedEntityMutation],
1932 ) -> RedDBResult<()> {
1933 if applied.is_empty() {
1934 return Ok(());
1935 }
1936
1937 let store = self.db().store();
1938 let collection = &applied[0].collection;
1939 let Some(manager) = store.get_collection(collection) else {
1940 return Err(crate::RedDBError::NotFound(format!(
1941 "collection not found: {collection}"
1942 )));
1943 };
1944
1945 let mut ordinary = Vec::with_capacity(applied.len());
1946 for item in applied {
1947 if let Some(old_version) = item.replaced_entity.as_ref() {
1948 store
1949 .install_versioned_table_row_update(
1950 collection,
1951 old_version.clone(),
1952 item.entity.clone(),
1953 if item.persist_metadata {
1954 item.metadata.as_ref()
1955 } else {
1956 None
1957 },
1958 )
1959 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
1960 if self.current_xid().is_some() {
1961 self.record_pending_versioned_update(
1962 crate::runtime::impl_core::current_connection_id(),
1963 collection,
1964 old_version.id,
1965 item.entity.id,
1966 old_version.xmax,
1967 item.replaced_entity_previous_xmax,
1968 );
1969 }
1970 } else {
1971 ordinary.push(item);
1972 }
1973 }
1974 if ordinary.is_empty() {
1975 return Ok(());
1976 }
1977
1978 manager
1979 .update_hot_batch_with_metadata(ordinary.iter().map(|item| {
1980 (
1981 &item.entity,
1982 item.modified_columns.as_slice(),
1983 if item.persist_metadata {
1984 item.metadata.as_ref()
1985 } else {
1986 None
1987 },
1988 )
1989 }))
1990 .map_err(|err| crate::RedDBError::Query(err.to_string()))?;
1991
1992 let indexed_cols = self
2001 .index_store_ref()
2002 .indexed_columns_set(collection.as_str());
2003 let all_hot = !indexed_cols.is_empty()
2004 && ordinary.iter().all(|item| {
2005 !item.persist_metadata
2006 && !item
2007 .modified_columns
2008 .iter()
2009 .any(|c| indexed_cols.contains(c))
2010 })
2011 || indexed_cols.is_empty() && ordinary.iter().all(|item| !item.persist_metadata);
2012
2013 let entity_refs: Vec<&crate::storage::UnifiedEntity> =
2017 ordinary.iter().map(|item| &item.entity).collect();
2018 let persist_fn = if all_hot {
2019 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager_wal_only
2020 } else {
2021 crate::storage::unified::UnifiedStore::persist_entity_refs_to_pager
2022 };
2023 persist_fn(store.as_ref(), collection, &entity_refs)
2024 .map_err(|err| crate::RedDBError::Internal(err.to_string()))
2025 }
2026
2027 pub(crate) fn flush_applied_entity_mutation(
2028 &self,
2029 applied: &AppliedEntityMutation,
2030 ) -> RedDBResult<()> {
2031 let store = self.db().store();
2032 if applied.context_index_dirty {
2033 store
2034 .context_index()
2035 .index_entity(&applied.collection, &applied.entity);
2036 }
2037 let mut changed_columns: Option<Vec<String>> = None;
2045 if !applied.pre_mutation_fields.is_empty() {
2046 let post = entity_row_fields_snapshot(&applied.entity);
2047 if !post.is_empty() {
2048 let damage = crate::application::entity::row_damage_vector(
2049 &applied.pre_mutation_fields,
2050 &post,
2051 );
2052 if !damage.is_empty() {
2053 changed_columns = Some(
2054 damage
2055 .touched_columns()
2056 .into_iter()
2057 .map(str::to_string)
2058 .collect(),
2059 );
2060 }
2061
2062 let indexed_cols: std::collections::HashSet<String> = self
2071 .index_store_ref()
2072 .list_indices(applied.collection.as_str())
2073 .into_iter()
2074 .filter_map(|idx| idx.columns.first().cloned())
2075 .collect();
2076 let modified_cols: std::collections::HashSet<String> = damage
2077 .touched_columns()
2078 .into_iter()
2079 .map(str::to_string)
2080 .collect();
2081 if let Some(old_version) = applied.replaced_entity.as_ref() {
2082 let old_index_fields: Vec<(String, Value)> = applied
2083 .pre_mutation_fields
2084 .iter()
2085 .filter(|(col, _)| indexed_cols.contains(col))
2086 .cloned()
2087 .collect();
2088 let new_index_fields: Vec<(String, Value)> = post
2089 .iter()
2090 .filter(|(col, _)| indexed_cols.contains(col))
2091 .cloned()
2092 .collect();
2093 if !old_index_fields.is_empty() {
2094 self.index_store_ref()
2095 .index_entity_delete(
2096 &applied.collection,
2097 old_version.id,
2098 &old_index_fields,
2099 )
2100 .map_err(crate::RedDBError::Internal)?;
2101 }
2102 if !new_index_fields.is_empty() {
2103 self.index_store_ref()
2104 .index_entity_insert(
2105 &applied.collection,
2106 applied.entity.id,
2107 &new_index_fields,
2108 )
2109 .map_err(crate::RedDBError::Internal)?;
2110 }
2111 } else {
2112 let decision = crate::storage::engine::hot_update::decide(
2113 &crate::storage::engine::hot_update::HotUpdateInputs {
2114 collection: applied.collection.as_str(),
2115 indexed_columns: &indexed_cols,
2116 modified_columns: &modified_cols,
2117 new_tuple_size: 0,
2121 page_free_space: usize::MAX,
2122 },
2123 );
2124 if !decision.can_hot {
2125 self.index_store_ref()
2126 .index_entity_update(
2127 &applied.collection,
2128 applied.id,
2129 &applied.pre_mutation_fields,
2130 &post,
2131 )
2132 .map_err(crate::RedDBError::Internal)?;
2133 } else {
2134 tracing::debug!(
2138 collection = %reddb_wire::audit_safe_log_field(&applied.collection),
2139 "hot_update fast-path: skipped index_entity_update"
2140 );
2141 }
2142 }
2143 }
2144 }
2145 self.cdc_emit_prebuilt_with_columns(
2146 crate::replication::cdc::ChangeOperation::Update,
2147 &applied.collection,
2148 &applied.entity,
2149 "entity",
2150 applied.metadata.as_ref(),
2151 true,
2152 changed_columns,
2153 );
2154 Ok(())
2155 }
2156
2157 pub(crate) fn apply_loaded_patch_entity(
2158 &self,
2159 collection: String,
2160 entity: crate::storage::UnifiedEntity,
2161 payload: JsonValue,
2162 operations: Vec<PatchEntityOperation>,
2163 ) -> RedDBResult<CreateEntityOutput> {
2164 let applied =
2165 self.apply_loaded_patch_entity_core(collection, entity, payload, operations)?;
2166 self.persist_applied_entity_mutations(std::slice::from_ref(&applied))?;
2167 self.flush_applied_entity_mutation(&applied)?;
2168 Ok(CreateEntityOutput {
2169 id: applied.id,
2170 entity: Some(applied.entity),
2171 })
2172 }
2173}
2174
2175fn ensure_non_tree_reserved_metadata_patch_paths(
2176 operations: &[PatchEntityOperation],
2177) -> RedDBResult<()> {
2178 for operation in operations {
2179 let Some(key) = operation.path.first().map(String::as_str) else {
2180 continue;
2181 };
2182 ensure_non_tree_reserved_metadata_key(key)?;
2183 }
2184 Ok(())
2185}
2186
2187fn ensure_non_tree_reserved_metadata_key(key: &str) -> RedDBResult<()> {
2188 if key.starts_with(TREE_METADATA_PREFIX) {
2189 return Err(crate::RedDBError::Query(format!(
2190 "metadata key '{}' is reserved for managed trees",
2191 key
2192 )));
2193 }
2194 Ok(())
2195}
2196
2197fn ensure_non_tree_reserved_metadata_entries(
2198 metadata: &[(String, MetadataValue)],
2199) -> RedDBResult<()> {
2200 for (key, _) in metadata {
2201 ensure_non_tree_reserved_metadata_key(key)?;
2202 }
2203 Ok(())
2204}
2205
2206fn ensure_non_tree_structural_edge_label(label: &str) -> RedDBResult<()> {
2207 if label.eq_ignore_ascii_case(TREE_CHILD_EDGE_LABEL) {
2208 return Err(crate::RedDBError::Query(format!(
2209 "edge label '{}' is reserved for managed trees",
2210 TREE_CHILD_EDGE_LABEL
2211 )));
2212 }
2213 Ok(())
2214}
2215
2216impl RedDBRuntime {
2217 pub(crate) fn create_node_unchecked(
2218 &self,
2219 input: CreateNodeInput,
2220 ) -> RedDBResult<CreateEntityOutput> {
2221 let db = self.db();
2222 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2223 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2224 let mut metadata = input.metadata;
2225 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2226 let mut builder = db.node(&input.collection, &input.label);
2227
2228 if let Some(node_type) = input.node_type {
2229 builder = builder.node_type(node_type);
2230 }
2231
2232 for (key, value) in input.properties {
2233 builder = builder.property(key, value);
2234 }
2235
2236 for (key, value) in metadata {
2237 builder = builder.metadata(key, value);
2238 }
2239
2240 for embedding in input.embeddings {
2241 if let Some(model) = embedding.model {
2242 builder = builder.embedding_with_model(embedding.name, embedding.vector, model);
2243 } else {
2244 builder = builder.embedding(embedding.name, embedding.vector);
2245 }
2246 }
2247
2248 for link in input.table_links {
2249 builder = builder.link_to_table(link.key, link.table);
2250 }
2251
2252 for link in input.node_links {
2253 builder = builder.link_to_weighted(link.target, link.edge_label, link.weight);
2254 }
2255
2256 let id = builder.save()?;
2257 self.stamp_xmin_if_in_txn(&input.collection, id);
2260 refresh_context_index(&db, &input.collection, id)?;
2261 self.cdc_emit(
2262 crate::replication::cdc::ChangeOperation::Insert,
2263 &input.collection,
2264 id.raw(),
2265 "graph_node",
2266 );
2267 Ok(CreateEntityOutput {
2268 id,
2269 entity: db.store().get(&input.collection, id),
2270 })
2271 }
2272
2273 pub(crate) fn create_edge_unchecked(
2274 &self,
2275 input: CreateEdgeInput,
2276 ) -> RedDBResult<CreateEntityOutput> {
2277 let db = self.db();
2278 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2279 contract.ensure_model(crate::catalog::CollectionModel::Graph)?;
2280 let mut metadata = input.metadata;
2281 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2282 let mut builder = db
2283 .edge(&input.collection, &input.label)
2284 .from(input.from)
2285 .to(input.to);
2286
2287 if let Some(weight) = input.weight {
2288 builder = builder.weight(weight);
2289 }
2290
2291 for (key, value) in input.properties {
2292 builder = builder.property(key, value);
2293 }
2294
2295 for (key, value) in metadata {
2296 builder = builder.metadata(key, value);
2297 }
2298
2299 let id = builder.save()?;
2300 self.stamp_xmin_if_in_txn(&input.collection, id);
2303 refresh_context_index(&db, &input.collection, id)?;
2304 self.cdc_emit(
2305 crate::replication::cdc::ChangeOperation::Insert,
2306 &input.collection,
2307 id.raw(),
2308 "graph_edge",
2309 );
2310 Ok(CreateEntityOutput {
2311 id,
2312 entity: db.store().get(&input.collection, id),
2313 })
2314 }
2315}
2316
2317fn create_rows_batch_prevalidated_columnar_with_outputs(
2318 runtime: &RedDBRuntime,
2319 collection: String,
2320 column_names: std::sync::Arc<Vec<String>>,
2321 rows: Vec<Vec<crate::storage::schema::Value>>,
2322) -> RedDBResult<Vec<CreateEntityOutput>> {
2323 use crate::storage::{
2324 unified::{EntityData, EntityKind, RowData},
2325 EntityId, UnifiedEntity,
2326 };
2327 use std::sync::Arc;
2328
2329 if rows.is_empty() {
2330 return Ok(Vec::new());
2331 }
2332 runtime.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2333 runtime.check_batch_size(rows.len())?;
2334 runtime.check_db_size()?;
2335
2336 let db = runtime.db();
2337 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2338 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2339
2340 let store = db.store();
2341 let table_arc: Arc<str> = Arc::from(collection.as_str());
2342
2343 let indexed_cols = runtime
2344 .index_store_ref()
2345 .indexed_columns_set(collection.as_str());
2346 let has_secondary_indexes = !indexed_cols.is_empty();
2347 let mut field_snapshots: Vec<Vec<(String, crate::storage::schema::Value)>> =
2348 if has_secondary_indexes {
2349 Vec::with_capacity(rows.len())
2350 } else {
2351 Vec::new()
2352 };
2353
2354 let entities: Vec<UnifiedEntity> = rows
2355 .into_iter()
2356 .map(|values| {
2357 if has_secondary_indexes {
2358 let mut snap: Vec<(String, crate::storage::schema::Value)> =
2359 Vec::with_capacity(indexed_cols.len());
2360 for (name, val) in column_names.iter().zip(values.iter()) {
2361 if indexed_cols.contains(name) {
2362 snap.push((name.clone(), val.clone()));
2363 }
2364 }
2365 field_snapshots.push(snap);
2366 }
2367 let mut row = RowData::new(values);
2368 row.schema = Some(Arc::clone(&column_names));
2369 UnifiedEntity::new(
2370 EntityId::new(0),
2371 EntityKind::TableRow {
2372 table: Arc::clone(&table_arc),
2373 row_id: 0,
2374 },
2375 EntityData::Row(row),
2376 )
2377 })
2378 .collect();
2379
2380 let ids = store
2381 .bulk_insert(&collection, entities)
2382 .map_err(|e| crate::RedDBError::Internal(format!("{e:?}")))?;
2383
2384 if has_secondary_indexes {
2385 let index_rows: Vec<(EntityId, Vec<(String, crate::storage::schema::Value)>)> = ids
2386 .iter()
2387 .zip(field_snapshots)
2388 .map(|(id, fields)| (*id, fields))
2389 .collect();
2390 runtime
2391 .index_store_ref()
2392 .index_entity_insert_batch(&collection, &index_rows)
2393 .map_err(crate::RedDBError::Internal)?;
2394 }
2395
2396 runtime.invalidate_result_cache();
2397 runtime.cdc_emit_insert_batch_no_cache_invalidate(&collection, &ids, "table");
2398
2399 Ok(ids
2400 .into_iter()
2401 .map(|id| CreateEntityOutput { id, entity: None })
2402 .collect())
2403}
2404
2405impl RuntimeEntityPort for RedDBRuntime {
2406 fn create_row(&self, input: CreateRowInput) -> RedDBResult<CreateEntityOutput> {
2407 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2408 let db = self.db();
2409 let CreateRowInput {
2410 collection,
2411 fields,
2412 metadata: input_metadata,
2413 node_links,
2414 vector_links,
2415 } = input;
2416 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2417 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2418 let mut metadata = input_metadata;
2419 apply_collection_default_ttl(&db, &collection, &mut metadata);
2420 let fields = contract.normalize_insert_fields(fields)?;
2421 contract.enforce_row_uniqueness(&fields, None)?;
2422 let engine = self.mutation_engine();
2424 let result = engine.apply(
2425 collection.clone(),
2426 vec![crate::runtime::mutation::MutationRow {
2427 fields,
2428 metadata,
2429 node_links,
2430 vector_links,
2431 }],
2432 )?;
2433 let id = result.ids[0];
2434 Ok(CreateEntityOutput {
2439 id,
2440 entity: db.store().get(&collection, id),
2441 })
2442 }
2443
2444 fn create_rows_batch(
2445 &self,
2446 input: CreateRowsBatchInput,
2447 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2448 if input.rows.is_empty() {
2449 return Ok(Vec::new());
2450 }
2451 self.check_batch_size(input.rows.len())?;
2452 self.check_db_size()?;
2453
2454 let db = self.db();
2455 let collection = input.collection;
2456 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2457 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2458
2459 let mut prepared_rows = Vec::with_capacity(input.rows.len());
2460 let mut uniqueness_rows = Vec::with_capacity(input.rows.len());
2461 for row in input.rows {
2462 if row.collection != collection {
2463 return Err(crate::RedDBError::Query(format!(
2464 "batch row collection mismatch: expected '{}', got '{}'",
2465 collection, row.collection
2466 )));
2467 }
2468
2469 let mut metadata = row.metadata;
2470 apply_collection_default_ttl(&db, &collection, &mut metadata);
2471 let fields = contract.normalize_insert_fields(row.fields)?;
2472 contract.enforce_row_uniqueness(&fields, None)?;
2473 uniqueness_rows.push(fields.clone());
2474 prepared_rows.push((fields, metadata, row.node_links, row.vector_links));
2475 }
2476
2477 contract.enforce_batch_uniqueness(&uniqueness_rows)?;
2478
2479 let engine = {
2482 let e = self.mutation_engine();
2483 if input.suppress_events {
2484 e.with_suppress_events()
2485 } else {
2486 e
2487 }
2488 };
2489 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> = prepared_rows
2490 .into_iter()
2491 .map(|(fields, metadata, node_links, vector_links)| {
2492 crate::runtime::mutation::MutationRow {
2493 fields,
2494 metadata,
2495 node_links,
2496 vector_links,
2497 }
2498 })
2499 .collect();
2500
2501 let result = engine
2502 .apply(collection.clone(), mutation_rows)
2503 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2504
2505 let store = db.store();
2506 Ok(result
2507 .ids
2508 .into_iter()
2509 .map(|id| CreateEntityOutput {
2510 id,
2511 entity: store.get(&collection, id),
2512 })
2513 .collect())
2514 }
2515
2516 fn create_rows_batch_prevalidated_columnar(
2517 &self,
2518 collection: String,
2519 column_names: std::sync::Arc<Vec<String>>,
2520 rows: Vec<Vec<crate::storage::schema::Value>>,
2521 ) -> RedDBResult<usize> {
2522 create_rows_batch_prevalidated_columnar_with_outputs(self, collection, column_names, rows)
2523 .map(|outputs| outputs.len())
2524 }
2525
2526 fn create_rows_batch_columnar(
2527 &self,
2528 collection: String,
2529 column_names: std::sync::Arc<Vec<String>>,
2530 rows: Vec<Vec<crate::storage::schema::Value>>,
2531 ) -> RedDBResult<usize> {
2532 self.create_rows_batch_columnar_with_outputs(collection, column_names, rows)
2533 .map(|outputs| outputs.len())
2534 }
2535
2536 fn create_rows_batch_columnar_with_outputs(
2537 &self,
2538 collection: String,
2539 column_names: std::sync::Arc<Vec<String>>,
2540 rows: Vec<Vec<crate::storage::schema::Value>>,
2541 ) -> RedDBResult<Vec<CreateEntityOutput>> {
2542 if rows.is_empty() {
2543 return Ok(Vec::new());
2544 }
2545 self.check_batch_size(rows.len())?;
2546 self.check_db_size()?;
2547
2548 let db = self.db();
2549 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2550 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2551
2552 let needs_normalisation = match db.collection_contract(&collection) {
2562 Some(c) => {
2563 c.declared_model == crate::catalog::CollectionModel::Table
2564 && (!c.declared_columns.is_empty()
2565 || c.table_def
2566 .as_ref()
2567 .map(|t| !t.columns.is_empty())
2568 .unwrap_or(false))
2569 }
2570 None => false,
2571 };
2572 if !needs_normalisation {
2573 return create_rows_batch_prevalidated_columnar_with_outputs(
2574 self,
2575 collection,
2576 column_names,
2577 rows,
2578 );
2579 }
2580
2581 let ncols = column_names.len();
2588 let tuple_rows: Vec<CreateRowInput> = rows
2589 .into_iter()
2590 .map(|values| {
2591 let mut fields: Vec<(String, crate::storage::schema::Value)> =
2592 Vec::with_capacity(ncols);
2593 for (name, value) in column_names.iter().zip(values) {
2594 fields.push((name.clone(), value));
2595 }
2596 CreateRowInput {
2597 collection: collection.clone(),
2598 fields,
2599 metadata: Vec::new(),
2600 node_links: Vec::new(),
2601 vector_links: Vec::new(),
2602 }
2603 })
2604 .collect();
2605 self.create_rows_batch(CreateRowsBatchInput {
2606 collection,
2607 rows: tuple_rows,
2608 suppress_events: false,
2609 })
2610 }
2611
2612 fn create_rows_batch_prevalidated(&self, input: CreateRowsBatchInput) -> RedDBResult<usize> {
2613 if input.rows.is_empty() {
2614 return Ok(0);
2615 }
2616 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2617 self.check_batch_size(input.rows.len())?;
2618 self.check_db_size()?;
2619
2620 let db = self.db();
2621 let collection = input.collection;
2622 let contract = CollectionContractWriteEnforcer::new(&db, &collection);
2627 contract.ensure_model(crate::catalog::CollectionModel::Table)?;
2628
2629 let default_ttl_ms = db.collection_default_ttl_ms(&collection);
2635
2636 let mutation_rows: Vec<crate::runtime::mutation::MutationRow> =
2637 Vec::with_capacity(input.rows.len());
2638 let mut mutation_rows = mutation_rows;
2639 for row in input.rows {
2640 if row.collection != collection {
2641 return Err(crate::RedDBError::Query(format!(
2642 "batch row collection mismatch: expected '{}', got '{}'",
2643 collection, row.collection
2644 )));
2645 }
2646 let mut metadata = row.metadata;
2647 if let Some(ttl) = default_ttl_ms {
2648 if !has_internal_ttl_metadata(&metadata) {
2649 metadata.push((
2650 "_ttl_ms".to_string(),
2651 if ttl <= i64::MAX as u64 {
2652 MetadataValue::Int(ttl as i64)
2653 } else {
2654 MetadataValue::Timestamp(ttl)
2655 },
2656 ));
2657 }
2658 }
2659 mutation_rows.push(crate::runtime::mutation::MutationRow {
2660 fields: row.fields,
2661 metadata,
2662 node_links: row.node_links,
2663 vector_links: row.vector_links,
2664 });
2665 }
2666
2667 let engine = self.mutation_engine();
2668 let result = engine
2669 .apply(collection, mutation_rows)
2670 .map_err(|e| crate::RedDBError::Internal(e.to_string()))?;
2671 Ok(result.ids.len())
2672 }
2673
2674 fn create_node(&self, input: CreateNodeInput) -> RedDBResult<CreateEntityOutput> {
2675 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2676 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2677 input.properties.iter().map(|(key, _)| key.as_str()),
2678 &format!("node '{}'", input.collection),
2679 )?;
2680 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2681 self.create_node_unchecked(input)
2682 }
2683
2684 fn create_edge(&self, input: CreateEdgeInput) -> RedDBResult<CreateEntityOutput> {
2685 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2686 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2687 input.properties.iter().map(|(key, _)| key.as_str()),
2688 &format!("edge '{}'", input.collection),
2689 )?;
2690 ensure_non_tree_structural_edge_label(&input.label)?;
2691 ensure_non_tree_reserved_metadata_entries(&input.metadata)?;
2692 self.create_edge_unchecked(input)
2693 }
2694
2695 fn create_vector(&self, input: CreateVectorInput) -> RedDBResult<CreateEntityOutput> {
2696 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2697 let db = self.db();
2698 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2699 contract.ensure_model(crate::catalog::CollectionModel::Vector)?;
2700 ensure_vector_dimension_contract(&db, &input.collection, input.dense.len())?;
2701 let mut metadata = input.metadata;
2702 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2703 let mut builder = db.vector(&input.collection).dense(input.dense);
2704
2705 if let Some(content) = input.content {
2706 builder = builder.content(content);
2707 }
2708
2709 for (key, value) in metadata {
2710 builder = builder.metadata(key, value);
2711 }
2712
2713 if let Some(link_row) = input.link_row {
2714 builder = builder.link_to_table(link_row);
2715 }
2716
2717 if let Some(link_node) = input.link_node {
2718 builder = builder.link_to_node(link_node);
2719 }
2720
2721 let id = builder.save()?;
2722 let dense_for_turbo = match db.store().get(&input.collection, id).map(|e| e.data) {
2723 Some(crate::storage::unified::EntityData::Vector(data)) => Some(data.dense.clone()),
2724 _ => None,
2725 };
2726 self.stamp_xmin_if_in_txn(&input.collection, id);
2729 refresh_context_index(&db, &input.collection, id)?;
2730 if let (Some(state), Some(dense)) = (db.turbo_state(&input.collection), dense_for_turbo) {
2736 state.ensure_populated(&db.store(), &input.collection);
2740 use crate::runtime::turbo_crash_inject::{fire, InjectionPoint};
2742 fire(InjectionPoint::BeforeWalFsync);
2743 let _ = db
2744 .store()
2745 .append_vector_insert_record(&input.collection, id.raw(), &dense);
2746 fire(InjectionPoint::BeforeIndexCommit);
2747 let mut index = state.index.lock();
2748 index.insert(id, dense.clone());
2749 fire(InjectionPoint::BeforeExtentFsync);
2756 if let Some(extent) = state.extent.lock().as_mut() {
2757 let packed = index.encode_for_extent(&dense);
2758 let _ = extent.append(&packed);
2759 }
2760 }
2761 self.cdc_emit(
2762 crate::replication::cdc::ChangeOperation::Insert,
2763 &input.collection,
2764 id.raw(),
2765 "vector",
2766 );
2767 Ok(CreateEntityOutput {
2768 id,
2769 entity: db.store().get(&input.collection, id),
2770 })
2771 }
2772
2773 fn create_document(&self, input: CreateDocumentInput) -> RedDBResult<CreateEntityOutput> {
2774 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2775 let db = self.db();
2776 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2777 contract.ensure_model(crate::catalog::CollectionModel::Document)?;
2778
2779 if let JsonValue::Object(ref map) = input.body {
2780 crate::reserved_fields::ensure_no_reserved_public_item_fields(
2781 map.keys().map(String::as_str),
2782 &format!("document '{}'", input.collection),
2783 )?;
2784 }
2785
2786 let body_bytes = json_to_vec(&input.body).map_err(|err| {
2788 crate::RedDBError::Query(format!("failed to serialize document body: {err}"))
2789 })?;
2790 let mut fields: Vec<(String, crate::storage::schema::Value)> = vec![(
2791 "body".to_string(),
2792 crate::storage::schema::Value::Json(body_bytes),
2793 )];
2794
2795 if let JsonValue::Object(ref map) = input.body {
2797 for (key, value) in map {
2798 let storage_value = json_to_storage_value(value)?;
2799 fields.push((key.clone(), storage_value));
2800 }
2801 }
2802
2803 let mut metadata = input.metadata;
2804 apply_collection_default_ttl(&db, &input.collection, &mut metadata);
2805 let columns: Vec<(&str, crate::storage::schema::Value)> = fields
2806 .iter()
2807 .map(|(key, value)| (key.as_str(), value.clone()))
2808 .collect();
2809 let mut builder = db.row(&input.collection, columns);
2810
2811 for (key, value) in metadata {
2812 builder = builder.metadata(key, value);
2813 }
2814
2815 for node in input.node_links {
2816 builder = builder.link_to_node(node);
2817 }
2818
2819 for vector in input.vector_links {
2820 builder = builder.link_to_vector(vector);
2821 }
2822
2823 let id = builder.save()?;
2824 self.stamp_xmin_if_in_txn(&input.collection, id);
2826 refresh_context_index(&db, &input.collection, id)?;
2827 self.cdc_emit(
2828 crate::replication::cdc::ChangeOperation::Insert,
2829 &input.collection,
2830 id.raw(),
2831 "document",
2832 );
2833 Ok(CreateEntityOutput {
2834 id,
2835 entity: db.store().get(&input.collection, id),
2836 })
2837 }
2838
2839 fn create_kv(&self, input: CreateKvInput) -> RedDBResult<CreateEntityOutput> {
2840 let db = self.db();
2841 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2842 let declared_model = db
2843 .collection_contract(&input.collection)
2844 .map(|contract| contract.declared_model);
2845 let value = if declared_model == Some(crate::catalog::CollectionModel::Vault) {
2846 contract.ensure_model(crate::catalog::CollectionModel::Vault)?;
2847 self.seal_vault_value(&input.collection, input.value)?
2848 } else {
2849 contract.ensure_model(crate::catalog::CollectionModel::Kv)?;
2850 input.value
2851 };
2852 let fields = vec![
2853 (
2854 "key".to_string(),
2855 crate::storage::schema::Value::text(input.key),
2856 ),
2857 ("value".to_string(), value),
2858 ];
2859 let collection = input.collection;
2860 let result = self.mutation_engine().apply(
2861 collection.clone(),
2862 vec![crate::runtime::mutation::MutationRow {
2863 fields,
2864 metadata: input.metadata,
2865 node_links: Vec::new(),
2866 vector_links: Vec::new(),
2867 }],
2868 )?;
2869 let id = result.ids[0];
2870 Ok(CreateEntityOutput {
2871 id,
2872 entity: db.store().get(&collection, id),
2873 })
2874 }
2875
2876 fn create_timeseries_point(
2877 &self,
2878 input: CreateTimeSeriesPointInput,
2879 ) -> RedDBResult<CreateEntityOutput> {
2880 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2881 let db = self.db();
2882 let contract = CollectionContractWriteEnforcer::new(&db, &input.collection);
2883 contract.ensure_model(crate::catalog::CollectionModel::TimeSeries)?;
2884
2885 let mut fields = vec![
2886 (
2887 "metric".to_string(),
2888 crate::storage::schema::Value::text(input.metric),
2889 ),
2890 (
2891 "value".to_string(),
2892 crate::storage::schema::Value::Float(input.value),
2893 ),
2894 ];
2895
2896 if let Some(timestamp_ns) = input.timestamp_ns {
2897 fields.push((
2898 "timestamp".to_string(),
2899 crate::storage::schema::Value::UnsignedInteger(timestamp_ns),
2900 ));
2901 }
2902
2903 if !input.tags.is_empty() {
2904 let tags_json = JsonValue::Object(
2905 input
2906 .tags
2907 .into_iter()
2908 .map(|(key, value)| (key, JsonValue::String(value)))
2909 .collect(),
2910 );
2911 let tags_bytes = json_to_vec(&tags_json).map_err(|err| {
2912 crate::RedDBError::Query(format!("failed to serialize timeseries tags: {err}"))
2913 })?;
2914 fields.push((
2915 "tags".to_string(),
2916 crate::storage::schema::Value::Json(tags_bytes),
2917 ));
2918 }
2919
2920 let collection = input.collection;
2921 let id = self.insert_timeseries_point(&collection, fields, input.metadata)?;
2922 self.stamp_xmin_if_in_txn(&collection, id);
2925 refresh_context_index(&db, &collection, id)?;
2926
2927 Ok(CreateEntityOutput {
2928 id,
2929 entity: db.store().get(&collection, id),
2930 })
2931 }
2932
2933 fn get_kv(
2934 &self,
2935 collection: &str,
2936 key: &str,
2937 ) -> RedDBResult<Option<(crate::storage::schema::Value, crate::storage::EntityId)>> {
2938 let db = self.db();
2939 ensure_collection_model_read(&db, collection, crate::catalog::CollectionModel::Kv)?;
2940 let store = db.store();
2941 let Some(manager) = store.get_collection(collection) else {
2942 return Ok(None);
2943 };
2944 let entities = manager.query_all(|_| true);
2945 for entity in entities {
2946 if let crate::storage::EntityData::Row(ref row) = entity.data {
2947 if let Some(ref named) = row.named {
2948 if let Some(crate::storage::schema::Value::Text(ref k)) = named.get("key") {
2949 if &**k == key {
2950 let value = named
2951 .get("value")
2952 .cloned()
2953 .unwrap_or(crate::storage::schema::Value::Null);
2954 return Ok(Some((value, entity.id)));
2955 }
2956 }
2957 }
2958 }
2959 }
2960 Ok(None)
2961 }
2962
2963 fn delete_kv(&self, collection: &str, key: &str) -> RedDBResult<bool> {
2964 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2965 let found = self.get_kv(collection, key)?;
2966 if let Some((_, id)) = found {
2967 let db = self.db();
2968 let store = db.store();
2969 let deleted = store
2970 .delete(collection, id)
2971 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
2972 if deleted {
2973 store.context_index().remove_entity(id);
2974 }
2975 Ok(deleted)
2976 } else {
2977 Ok(false)
2978 }
2979 }
2980
2981 fn patch_entity(&self, input: PatchEntityInput) -> RedDBResult<CreateEntityOutput> {
2982 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
2983 let PatchEntityInput {
2984 collection,
2985 id,
2986 payload,
2987 operations,
2988 } = input;
2989 let db = self.db();
2990 let store = db.store();
2991 let Some(manager) = store.get_collection(&collection) else {
2992 return Err(crate::RedDBError::NotFound(format!(
2993 "collection not found: {collection}"
2994 )));
2995 };
2996 let Some(entity) = manager.get(id) else {
2997 return Err(crate::RedDBError::NotFound(format!(
2998 "entity not found: {}",
2999 id.raw()
3000 )));
3001 };
3002 self.apply_loaded_patch_entity(collection, entity, payload, operations)
3003 }
3004
3005 fn delete_entity(&self, input: DeleteEntityInput) -> RedDBResult<DeleteEntityOutput> {
3006 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
3007 let store = self.db().store();
3008 let pre_delete_fields = store
3012 .get(&input.collection, input.id)
3013 .as_ref()
3014 .map(entity_row_fields_snapshot)
3015 .unwrap_or_default();
3016 let deleted = store
3020 .delete(&input.collection, input.id)
3021 .map_err(|err| crate::RedDBError::Internal(err.to_string()))?;
3022 if deleted {
3023 store.context_index().remove_entity(input.id);
3024 if !pre_delete_fields.is_empty() {
3027 self.index_store_ref()
3028 .index_entity_delete(&input.collection, input.id, &pre_delete_fields)
3029 .map_err(crate::RedDBError::Internal)?;
3030 }
3031 self.cdc_emit(
3032 crate::replication::cdc::ChangeOperation::Delete,
3033 &input.collection,
3034 input.id.raw(),
3035 "entity",
3036 );
3037 }
3038 Ok(DeleteEntityOutput {
3039 deleted,
3040 id: input.id,
3041 })
3042 }
3043}